mz_sql/session/
vars.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Run-time configuration parameters
11//!
12//! ## Overview
13//! Materialize roughly follows the PostgreSQL configuration model, which works
14//! as follows. There is a global set of named configuration parameters, like
15//! `DateStyle` and `client_encoding`. These parameters can be set in several
16//! places: in an on-disk configuration file (in Postgres, named
17//! postgresql.conf), in command line arguments when the server is started, or
18//! at runtime via the `ALTER SYSTEM` or `SET` statements. Parameters that are
19//! set in a session take precedence over database defaults, which in turn take
20//! precedence over command line arguments, which in turn take precedence over
21//! settings in the on-disk configuration. Note that changing the value of
22//! parameters obeys transaction semantics: if a transaction fails to commit,
23//! any parameters that were changed in that transaction (i.e., via `SET`) will
24//! be rolled back to their previous value.
25//!
26//! The Materialize configuration hierarchy at the moment is much simpler.
27//! Global defaults are hardcoded into the binary, and a select few parameters
28//! can be overridden per session. A select few parameters can be overridden on
29//! disk.
30//!
31//! The set of variables that can be overridden per session and the set of
32//! variables that can be overridden on disk are currently disjoint. The
33//! infrastructure has been designed with an eye towards merging these two sets
34//! and supporting additional layers to the hierarchy, however, should the need
35//! arise.
36//!
37//! The configuration parameters that exist are driven by compatibility with
38//! PostgreSQL drivers that expect them, not because they are particularly
39//! important.
40//!
41//! ## Structure
42//! The most meaningful exports from this module are:
43//!
44//! - [`SessionVars`] represent per-session parameters, which each user can
45//!   access independently of one another, and are accessed via `SET`.
46//!
47//!   The fields of [`SessionVars`] are either;
48//!     - `SessionVar`, which is preferable and simply requires full support of
49//!       the `SessionVar` impl for its embedded value type.
50//!     - `ServerVar` for types that do not currently support everything
51//!       required by `SessionVar`, e.g. they are fixed-value parameters.
52//!
53//!   In the fullness of time, all fields in [`SessionVars`] should be
54//!   `SessionVar`.
55//!
56//! - [`SystemVars`] represent system-wide configuration settings and are
57//!   accessed via `ALTER SYSTEM SET`.
58//!
59//!   All elements of [`SystemVars`] are `SystemVar`.
60//!
61//! Some [`VarDefinition`] are also marked as a [`FeatureFlag`]; this is just a
62//! wrapper to make working with a set of [`VarDefinition`] easier, primarily from
63//! within SQL planning, where we might want to check if a feature is enabled
64//! before planning it.
65
66use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use im::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
81use mz_persist_client::cfg::{CRDB_CONNECT_TIMEOUT, CRDB_TCP_USER_TIMEOUT};
82use mz_repr::adt::numeric::Numeric;
83use mz_repr::adt::timestamp::CheckedTimestamp;
84use mz_repr::bytes::ByteSize;
85use mz_repr::user::ExternalUserMetadata;
86use mz_tracing::{CloneableEnvFilter, SerializableDirective};
87use serde::Serialize;
88use thiserror::Error;
89use tracing::error;
90use uncased::UncasedStr;
91
92use crate::ast::Ident;
93use crate::session::user::User;
94
95pub(crate) mod constraints;
96pub(crate) mod definitions;
97pub(crate) mod errors;
98pub(crate) mod polyfill;
99pub(crate) mod value;
100
101pub use definitions::*;
102pub use errors::*;
103pub use value::*;
104
105/// The action to take during end_transaction.
106///
107/// This enum lives here because of convenience: it's more of an adapter
108/// concept but [`SessionVars::end_transaction`] takes it.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum EndTransactionAction {
111    /// Commit the transaction.
112    Commit,
113    /// Rollback the transaction.
114    Rollback,
115}
116
117/// Represents the input to a variable.
118///
119/// Each variable has different rules for how it handles each style of input.
120/// This type allows us to defer interpretation of the input until the
121/// variable-specific interpretation can be applied.
122#[derive(Debug, Clone, Copy)]
123pub enum VarInput<'a> {
124    /// The input has been flattened into a single string.
125    Flat(&'a str),
126    /// The input comes from a SQL `SET` statement and is jumbled across
127    /// multiple components.
128    SqlSet(&'a [String]),
129}
130
131impl<'a> VarInput<'a> {
132    /// Converts the variable input to an owned vector of strings.
133    pub fn to_vec(&self) -> Vec<String> {
134        match self {
135            VarInput::Flat(v) => vec![v.to_string()],
136            VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
137        }
138    }
139}
140
141/// An owned version of [`VarInput`].
142#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
143pub enum OwnedVarInput {
144    /// See [`VarInput::Flat`].
145    Flat(String),
146    /// See [`VarInput::SqlSet`].
147    SqlSet(Vec<String>),
148}
149
150impl OwnedVarInput {
151    /// Converts this owned variable input as a [`VarInput`].
152    pub fn borrow(&self) -> VarInput<'_> {
153        match self {
154            OwnedVarInput::Flat(v) => VarInput::Flat(v),
155            OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
156        }
157    }
158}
159
160/// A `Var` represents a configuration parameter of an arbitrary type.
161pub trait Var: Debug {
162    /// Returns the name of the configuration parameter.
163    fn name(&self) -> &'static str;
164
165    /// Constructs a flattened string representation of the current value of the
166    /// configuration parameter.
167    ///
168    /// The resulting string is guaranteed to be parsable if provided to
169    /// `Value::parse` as a [`VarInput::Flat`].
170    fn value(&self) -> String;
171
172    /// Returns a short sentence describing the purpose of the configuration
173    /// parameter.
174    fn description(&self) -> &'static str;
175
176    /// Returns the name of the type of this variable.
177    fn type_name(&self) -> Cow<'static, str>;
178
179    /// Indicates wither the [`Var`] is visible as a function of the `user` and `system_vars`.
180    /// "Invisible" parameters return `VarErrors`.
181    ///
182    /// Variables marked as `internal` are only visible for the system user.
183    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
184
185    /// Reports whether the variable is only visible in unsafe mode.
186    fn is_unsafe(&self) -> bool {
187        self.name().starts_with("unsafe_")
188    }
189
190    /// Upcast `self` to a `dyn Var`, useful when working with multiple different implementors of
191    /// [`Var`].
192    fn as_var(&self) -> &dyn Var
193    where
194        Self: Sized,
195    {
196        self
197    }
198}
199
200/// A `SessionVar` is the session value for a configuration parameter. If unset,
201/// the server default is used instead.
202///
203/// Note: even though all of the different `*_value` fields are `Box<dyn Value>` they are enforced
204/// to be the same type because we use the `definition`s `parse(...)` method. This is guaranteed to
205/// return the same type as the compiled in default.
206#[derive(Debug)]
207pub struct SessionVar {
208    definition: VarDefinition,
209    /// System or Role default value.
210    default_value: Option<Box<dyn Value>>,
211    /// Value `LOCAL` to a transaction, will be unset at the completion of the transaction.
212    local_value: Option<Box<dyn Value>>,
213    /// Value set during a transaction, will be set if the transaction is committed.
214    staged_value: Option<Box<dyn Value>>,
215    /// Value that overrides the default.
216    session_value: Option<Box<dyn Value>>,
217}
218
219impl Clone for SessionVar {
220    fn clone(&self) -> Self {
221        SessionVar {
222            definition: self.definition.clone(),
223            default_value: self.default_value.as_ref().map(|v| v.box_clone()),
224            local_value: self.local_value.as_ref().map(|v| v.box_clone()),
225            staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
226            session_value: self.session_value.as_ref().map(|v| v.box_clone()),
227        }
228    }
229}
230
231impl SessionVar {
232    pub const fn new(var: VarDefinition) -> Self {
233        SessionVar {
234            definition: var,
235            default_value: None,
236            local_value: None,
237            staged_value: None,
238            session_value: None,
239        }
240    }
241
242    /// Checks if the provided [`VarInput`] is valid for the current session variable, returning
243    /// the formatted output if it's valid.
244    pub fn check(&self, input: VarInput) -> Result<String, VarError> {
245        let v = self.definition.parse(input)?;
246        self.validate_constraints(v.as_ref())?;
247
248        Ok(v.format())
249    }
250
251    /// Parse the input and update the stored value to match.
252    pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
253        let v = self.definition.parse(input)?;
254
255        // Validate our parsed value.
256        self.validate_constraints(v.as_ref())?;
257
258        if local {
259            self.local_value = Some(v);
260        } else {
261            self.local_value = None;
262            self.staged_value = Some(v);
263        }
264        Ok(())
265    }
266
267    /// Sets the default value for the variable.
268    pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
269        let v = self.definition.parse(input)?;
270        self.validate_constraints(v.as_ref())?;
271        self.default_value = Some(v);
272        Ok(())
273    }
274
275    /// Reset the stored value to the default.
276    pub fn reset(&mut self, local: bool) {
277        let value = self
278            .default_value
279            .as_ref()
280            .map(|v| v.as_ref())
281            .unwrap_or_else(|| self.definition.value.value());
282        if local {
283            self.local_value = Some(value.box_clone());
284        } else {
285            self.local_value = None;
286            self.staged_value = Some(value.box_clone());
287        }
288    }
289
290    /// Returns a possibly new SessionVar if this needs to mutate at transaction end.
291    #[must_use]
292    pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
293        if !self.is_mutating() {
294            return None;
295        }
296        let mut next: Self = self.clone();
297        next.local_value = None;
298        match action {
299            EndTransactionAction::Commit if next.staged_value.is_some() => {
300                next.session_value = next.staged_value.take()
301            }
302            _ => next.staged_value = None,
303        }
304        Some(next)
305    }
306
307    /// Whether this Var needs to mutate at the end of a transaction.
308    pub fn is_mutating(&self) -> bool {
309        self.local_value.is_some() || self.staged_value.is_some()
310    }
311
312    pub fn value_dyn(&self) -> &dyn Value {
313        self.local_value
314            .as_deref()
315            .or(self.staged_value.as_deref())
316            .or(self.session_value.as_deref())
317            .or(self.default_value.as_deref())
318            .unwrap_or_else(|| self.definition.value.value())
319    }
320
321    /// Returns the [`Value`] that is currently stored as the `session_value`.
322    ///
323    /// Note: This should __only__ be used for inspection, if you want to determine the current
324    /// value of this [`SessionVar`] you should use [`SessionVar::value`].
325    pub fn inspect_session_value(&self) -> Option<&dyn Value> {
326        self.session_value.as_deref()
327    }
328
329    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
330        if let Some(constraint) = &self.definition.constraint {
331            constraint.check_constraint(self, self.value_dyn(), val)
332        } else {
333            Ok(())
334        }
335    }
336}
337
338impl Var for SessionVar {
339    fn name(&self) -> &'static str {
340        self.definition.name.as_str()
341    }
342
343    fn value(&self) -> String {
344        self.value_dyn().format()
345    }
346
347    fn description(&self) -> &'static str {
348        self.definition.description
349    }
350
351    fn type_name(&self) -> Cow<'static, str> {
352        self.definition.type_name()
353    }
354
355    fn visible(
356        &self,
357        user: &User,
358        system_vars: &super::vars::SystemVars,
359    ) -> Result<(), super::vars::VarError> {
360        self.definition.visible(user, system_vars)
361    }
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct MzVersion {
366    /// Inputs to computed variables.
367    build_info: &'static BuildInfo,
368    /// Helm chart version
369    helm_chart_version: Option<String>,
370}
371
372impl MzVersion {
373    pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
374        MzVersion {
375            build_info,
376            helm_chart_version,
377        }
378    }
379}
380
381/// Session variables.
382///
383/// See the [`crate::session::vars`] module documentation for more details on the
384/// Materialize configuration model.
385#[derive(Debug, Clone)]
386pub struct SessionVars {
387    /// The set of all session variables.
388    vars: OrdMap<&'static UncasedStr, SessionVar>,
389    /// Inputs to computed variables.
390    mz_version: MzVersion,
391    /// Information about the user associated with this Session.
392    user: User,
393}
394
395impl SessionVars {
396    /// Creates a new [`SessionVars`] without considering the System or Role defaults.
397    pub fn new_unchecked(
398        build_info: &'static BuildInfo,
399        user: User,
400        helm_chart_version: Option<String>,
401    ) -> SessionVars {
402        use definitions::*;
403
404        let vars = [
405            &FAILPOINTS,
406            &SERVER_VERSION,
407            &SERVER_VERSION_NUM,
408            &SQL_SAFE_UPDATES,
409            &REAL_TIME_RECENCY,
410            &EMIT_PLAN_INSIGHTS_NOTICE,
411            &EMIT_TIMESTAMP_NOTICE,
412            &EMIT_TRACE_ID_NOTICE,
413            &AUTO_ROUTE_CATALOG_QUERIES,
414            &ENABLE_SESSION_RBAC_CHECKS,
415            &ENABLE_SESSION_CARDINALITY_ESTIMATES,
416            &MAX_IDENTIFIER_LENGTH,
417            &STATEMENT_LOGGING_SAMPLE_RATE,
418            &EMIT_INTROSPECTION_QUERY_NOTICE,
419            &UNSAFE_NEW_TRANSACTION_WALL_TIME,
420            &WELCOME_MESSAGE,
421        ]
422        .into_iter()
423        .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
424        .map(|var| (var.name, SessionVar::new(var.clone())))
425        .collect();
426
427        SessionVars {
428            vars,
429            mz_version: MzVersion::new(build_info, helm_chart_version),
430            user,
431        }
432    }
433
434    fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
435        let var = self
436            .vars
437            .get(var.name)
438            .expect("provided var should be in state");
439        let val = var.value_dyn();
440        val.as_any().downcast_ref::<V>().expect("success")
441    }
442
443    /// Returns an iterator over the configuration parameters and their current
444    /// values for this session.
445    ///
446    /// Note that this function does not check that the access variable should
447    /// be visible because of other settings or users. Before or after accessing
448    /// this method, you should call `Var::visible`.
449    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
450        #[allow(clippy::as_conversions)]
451        self.vars
452            .values()
453            .map(|v| v.as_var())
454            .chain([&self.mz_version as &dyn Var, &self.user])
455    }
456
457    /// Returns an iterator over configuration parameters (and their current
458    /// values for this session) that are expected to be sent to the client when
459    /// a new connection is established or when their value changes.
460    pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
461        // WARNING: variables in this set are not checked for visibility, and
462        // are assumed to be visible for all sessions.
463        //
464        // This is fixible with some elbow grease, but at the moment it seems
465        // unlikely that we'll have a variable in the notify set that shouldn't
466        // be visible to all sessions.
467        [
468            &APPLICATION_NAME,
469            &CLIENT_ENCODING,
470            &DATE_STYLE,
471            &INTEGER_DATETIMES,
472            &SERVER_VERSION,
473            &STANDARD_CONFORMING_STRINGS,
474            &TIMEZONE,
475            &INTERVAL_STYLE,
476            // Including `cluster`, `cluster_replica`, `database`, and `search_path` in the notify
477            // set is a Materialize extension. Doing so allows users to more easily identify where
478            // their queries will be executing, which is important to know when you consider the
479            // size of a cluster, what indexes are present, etc.
480            &CLUSTER,
481            &CLUSTER_REPLICA,
482            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
483            &DATABASE,
484            &SEARCH_PATH,
485        ]
486        .into_iter()
487        .map(|v| self.vars[v.name].as_var())
488        // Including `mz_version` in the notify set is a Materialize
489        // extension. Doing so allows applications to detect whether they
490        // are talking to Materialize or PostgreSQL without an additional
491        // network roundtrip. This is known to be safe because CockroachDB
492        // has an analogous extension [0].
493        // [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
494        .chain(std::iter::once(self.mz_version.as_var()))
495    }
496
497    /// Resets all variables to their default value.
498    pub fn reset_all(&mut self) {
499        let names: Vec<_> = self.vars.keys().copied().collect();
500        for name in names {
501            self.vars[name].reset(false);
502        }
503    }
504
505    /// Returns a [`Var`] representing the configuration parameter with the
506    /// specified name.
507    ///
508    /// Configuration parameters are matched case insensitively. If no such
509    /// configuration parameter exists, `get` returns an error.
510    ///
511    /// Note that if `name` is known at compile time, you should instead use the
512    /// named accessor to access the variable with its true Rust type. For
513    /// example, `self.get("sql_safe_updates").value()` returns the string
514    /// `"true"` or `"false"`, while `self.sql_safe_updates()` returns a bool.
515    pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
516        let name = compat_translate_name(name);
517
518        let name = UncasedStr::new(name);
519        if name == MZ_VERSION_NAME {
520            Ok(&self.mz_version)
521        } else if name == IS_SUPERUSER_NAME {
522            Ok(&self.user)
523        } else {
524            self.vars
525                .get(name)
526                .map(|v| {
527                    v.visible(&self.user, system_vars)?;
528                    Ok(v.as_var())
529                })
530                .transpose()?
531                .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
532        }
533    }
534
535    /// Returns a [`SessionVar`] for inspection.
536    ///
537    /// Note: If you're trying to determine the value of the variable with `name` you should
538    /// instead use the named accessor, or [`SessionVars::get`].
539    pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
540        let name = compat_translate_name(name);
541
542        self.vars
543            .get(UncasedStr::new(name))
544            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
545    }
546
547    /// Sets the configuration parameter named `name` to the value represented
548    /// by `value`.
549    ///
550    /// The new value may be either committed or rolled back by the next call to
551    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is always
552    /// discarded by the next call to [`SessionVars::end_transaction`], even if the
553    /// transaction is marked to commit.
554    ///
555    /// Like with [`SessionVars::get`], configuration parameters are matched case
556    /// insensitively. If `value` is not valid, as determined by the underlying
557    /// configuration parameter, or if the named configuration parameter does
558    /// not exist, an error is returned.
559    pub fn set(
560        &mut self,
561        system_vars: &SystemVars,
562        name: &str,
563        input: VarInput,
564        local: bool,
565    ) -> Result<(), VarError> {
566        let (name, input) = compat_translate(name, input);
567
568        let name = UncasedStr::new(name);
569        self.check_read_only(name)?;
570
571        self.vars
572            .get_mut(name)
573            .map(|v| {
574                v.visible(&self.user, system_vars)?;
575                v.set(input, local)
576            })
577            .transpose()?
578            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
579    }
580
581    /// Sets the default value for the parameter named `name` to the value
582    /// represented by `value`.
583    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
584        let (name, input) = compat_translate(name, input);
585
586        let name = UncasedStr::new(name);
587        self.check_read_only(name)?;
588
589        self.vars
590            .get_mut(name)
591            // Note: visibility is checked when persisting a role default.
592            .map(|v| v.set_default(input))
593            .transpose()?
594            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
595    }
596
597    /// Sets the configuration parameter named `name` to its default value.
598    ///
599    /// The new value may be either committed or rolled back by the next call to
600    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is
601    /// always discarded by the next call to [`SessionVars::end_transaction`],
602    /// even if the transaction is marked to commit.
603    ///
604    /// Like with [`SessionVars::get`], configuration parameters are matched
605    /// case insensitively. If the named configuration parameter does not exist,
606    /// an error is returned.
607    ///
608    /// If the variable does not exist or the user does not have the visibility
609    /// requires, this function returns an error.
610    pub fn reset(
611        &mut self,
612        system_vars: &SystemVars,
613        name: &str,
614        local: bool,
615    ) -> Result<(), VarError> {
616        let name = compat_translate_name(name);
617
618        let name = UncasedStr::new(name);
619        self.check_read_only(name)?;
620
621        self.vars
622            .get_mut(name)
623            .map(|v| {
624                v.visible(&self.user, system_vars)?;
625                v.reset(local);
626                Ok(())
627            })
628            .transpose()?
629            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
630    }
631
632    /// Returns an error if the variable corresponding to `name` is read only.
633    fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
634        if name == MZ_VERSION_NAME {
635            Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
636        } else if name == IS_SUPERUSER_NAME {
637            Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
638        } else if name == MAX_IDENTIFIER_LENGTH.name {
639            Err(VarError::ReadOnlyParameter(
640                MAX_IDENTIFIER_LENGTH.name.as_str(),
641            ))
642        } else {
643            Ok(())
644        }
645    }
646
647    /// Commits or rolls back configuration parameter updates made via
648    /// [`SessionVars::set`] since the last call to `end_transaction`.
649    ///
650    /// Returns any session parameters that changed because the transaction ended.
651    #[mz_ore::instrument(level = "debug")]
652    pub fn end_transaction(
653        &mut self,
654        action: EndTransactionAction,
655    ) -> BTreeMap<&'static str, String> {
656        let mut changed = BTreeMap::new();
657        let mut updates = Vec::new();
658        for (name, var) in self.vars.iter() {
659            if !var.is_mutating() {
660                continue;
661            }
662            let before = var.value();
663            let next = var.end_transaction(action).expect("must mutate");
664            let after = next.value();
665            updates.push((*name, next));
666
667            // Report the new value of the parameter.
668            if before != after {
669                changed.insert(var.name(), after);
670            }
671        }
672        self.vars.extend(updates);
673        changed
674    }
675
676    /// Returns the value of the `application_name` configuration parameter.
677    pub fn application_name(&self) -> &str {
678        self.expect_value::<String>(&APPLICATION_NAME).as_str()
679    }
680
681    /// Returns the build info.
682    pub fn build_info(&self) -> &'static BuildInfo {
683        self.mz_version.build_info
684    }
685
686    /// Returns the value of the `client_encoding` configuration parameter.
687    pub fn client_encoding(&self) -> &ClientEncoding {
688        self.expect_value(&CLIENT_ENCODING)
689    }
690
691    /// Returns the value of the `client_min_messages` configuration parameter.
692    pub fn client_min_messages(&self) -> &ClientSeverity {
693        self.expect_value(&CLIENT_MIN_MESSAGES)
694    }
695
696    /// Returns the value of the `cluster` configuration parameter.
697    pub fn cluster(&self) -> &str {
698        self.expect_value::<String>(&CLUSTER).as_str()
699    }
700
701    /// Returns the value of the `cluster_replica` configuration parameter.
702    pub fn cluster_replica(&self) -> Option<&str> {
703        self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
704            .as_deref()
705    }
706
707    /// Returns the value of the `current_object_missing_warnings` configuration
708    /// parameter.
709    pub fn current_object_missing_warnings(&self) -> bool {
710        *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
711    }
712
713    /// Returns the value of the `DateStyle` configuration parameter.
714    pub fn date_style(&self) -> &[&str] {
715        &self.expect_value::<DateStyle>(&DATE_STYLE).0
716    }
717
718    /// Returns the value of the `database` configuration parameter.
719    pub fn database(&self) -> &str {
720        self.expect_value::<String>(&DATABASE).as_str()
721    }
722
723    /// Returns the value of the `extra_float_digits` configuration parameter.
724    pub fn extra_float_digits(&self) -> i32 {
725        *self.expect_value(&EXTRA_FLOAT_DIGITS)
726    }
727
728    /// Returns the value of the `integer_datetimes` configuration parameter.
729    pub fn integer_datetimes(&self) -> bool {
730        *self.expect_value(&INTEGER_DATETIMES)
731    }
732
733    /// Returns the value of the `intervalstyle` configuration parameter.
734    pub fn intervalstyle(&self) -> &IntervalStyle {
735        self.expect_value(&INTERVAL_STYLE)
736    }
737
738    /// Returns the value of the `mz_version` configuration parameter.
739    pub fn mz_version(&self) -> String {
740        self.mz_version.value()
741    }
742
743    /// Returns the value of the `search_path` configuration parameter.
744    pub fn search_path(&self) -> &[Ident] {
745        self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
746    }
747
748    /// Returns the value of the `server_version` configuration parameter.
749    pub fn server_version(&self) -> &str {
750        self.expect_value::<String>(&SERVER_VERSION).as_str()
751    }
752
753    /// Returns the value of the `server_version_num` configuration parameter.
754    pub fn server_version_num(&self) -> i32 {
755        *self.expect_value(&SERVER_VERSION_NUM)
756    }
757
758    /// Returns the value of the `sql_safe_updates` configuration parameter.
759    pub fn sql_safe_updates(&self) -> bool {
760        *self.expect_value(&SQL_SAFE_UPDATES)
761    }
762
763    /// Returns the value of the `standard_conforming_strings` configuration
764    /// parameter.
765    pub fn standard_conforming_strings(&self) -> bool {
766        *self.expect_value(&STANDARD_CONFORMING_STRINGS)
767    }
768
769    /// Returns the value of the `statement_timeout` configuration parameter.
770    pub fn statement_timeout(&self) -> &Duration {
771        self.expect_value(&STATEMENT_TIMEOUT)
772    }
773
774    /// Returns the value of the `idle_in_transaction_session_timeout` configuration parameter.
775    pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
776        self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
777    }
778
779    /// Returns the value of the `timezone` configuration parameter.
780    pub fn timezone(&self) -> &TimeZone {
781        self.expect_value(&TIMEZONE)
782    }
783
784    /// Returns the value of the `transaction_isolation` configuration
785    /// parameter.
786    pub fn transaction_isolation(&self) -> &IsolationLevel {
787        self.expect_value(&TRANSACTION_ISOLATION)
788    }
789
790    /// Returns the value of `real_time_recency` configuration parameter.
791    pub fn real_time_recency(&self) -> bool {
792        *self.expect_value(&REAL_TIME_RECENCY)
793    }
794
795    /// Returns the value of the `real_time_recency_timeout` configuration parameter.
796    pub fn real_time_recency_timeout(&self) -> &Duration {
797        self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
798    }
799
800    /// Returns the value of `emit_plan_insights_notice` configuration parameter.
801    pub fn emit_plan_insights_notice(&self) -> bool {
802        *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
803    }
804
805    /// Returns the value of `emit_timestamp_notice` configuration parameter.
806    pub fn emit_timestamp_notice(&self) -> bool {
807        *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
808    }
809
810    /// Returns the value of `emit_trace_id_notice` configuration parameter.
811    pub fn emit_trace_id_notice(&self) -> bool {
812        *self.expect_value(&EMIT_TRACE_ID_NOTICE)
813    }
814
815    /// Returns the value of `auto_route_catalog_queries` configuration parameter.
816    pub fn auto_route_catalog_queries(&self) -> bool {
817        *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
818    }
819
820    /// Returns the value of `enable_session_rbac_checks` configuration parameter.
821    pub fn enable_session_rbac_checks(&self) -> bool {
822        *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
823    }
824
825    /// Returns the value of `enable_session_cardinality_estimates` configuration parameter.
826    pub fn enable_session_cardinality_estimates(&self) -> bool {
827        *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
828    }
829
830    /// Returns the value of `is_superuser` configuration parameter.
831    pub fn is_superuser(&self) -> bool {
832        self.user.is_superuser()
833    }
834
835    /// Returns the user associated with this `SessionVars` instance.
836    pub fn user(&self) -> &User {
837        &self.user
838    }
839
840    /// Returns the value of the `max_query_result_size` configuration parameter.
841    pub fn max_query_result_size(&self) -> u64 {
842        self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
843            .as_bytes()
844    }
845
846    /// Sets the external metadata associated with the user.
847    pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
848        self.user.external_metadata = Some(metadata);
849    }
850
851    pub fn set_cluster(&mut self, cluster: String) {
852        let var = self
853            .vars
854            .get_mut(UncasedStr::new(CLUSTER.name()))
855            .expect("cluster variable must exist");
856        var.set(VarInput::Flat(&cluster), false)
857            .expect("setting cluster must succeed");
858    }
859
860    pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
861        let var = self
862            .vars
863            .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
864            .expect("transaction_isolation variable must exist");
865        var.set(VarInput::Flat(transaction_isolation.as_str()), true)
866            .expect("setting transaction isolation must succeed");
867    }
868
869    pub fn get_statement_logging_sample_rate(&self) -> Numeric {
870        *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
871    }
872
873    /// Returns the value of the `emit_introspection_query_notice` configuration parameter.
874    pub fn emit_introspection_query_notice(&self) -> bool {
875        *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
876    }
877
878    pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
879        *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
880    }
881
882    /// Returns the value of the `welcome_message` configuration parameter.
883    pub fn welcome_message(&self) -> bool {
884        *self.expect_value(&WELCOME_MESSAGE)
885    }
886}
887
888// TODO(database-issues#8069) remove together with `compat_translate`
889pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
890pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
891
892/// If the given variable name and/or input is deprecated, return a corresponding updated value,
893/// otherwise return the original.
894///
895/// This method was introduced to gracefully handle the rename of the `mz_introspection` cluster to
896/// `mz_cluster_server`. The plan is to remove it once all users have migrated to the new name. The
897/// debug logs will be helpful for checking this in production.
898// TODO(database-issues#8069) remove this after sufficient time has passed
899fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
900    if name == CLUSTER.name() {
901        if let Ok(value) = CLUSTER.parse(input) {
902            if value.format() == OLD_CATALOG_SERVER_CLUSTER {
903                tracing::debug!(
904                    github_27285 = true,
905                    "encountered deprecated `cluster` variable value: {}",
906                    OLD_CATALOG_SERVER_CLUSTER,
907                );
908                return (name, VarInput::Flat("mz_catalog_server"));
909            }
910        }
911    }
912
913    if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
914        tracing::debug!(
915            github_27285 = true,
916            "encountered deprecated `{}` variable name",
917            OLD_AUTO_ROUTE_CATALOG_QUERIES,
918        );
919        return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
920    }
921
922    (name, input)
923}
924
925fn compat_translate_name(name: &str) -> &str {
926    let (name, _) = compat_translate(name, VarInput::Flat(""));
927    name
928}
929
930/// A `SystemVar` is persisted on disk value for a configuration parameter. If unset,
931/// the server default is used instead.
932#[derive(Debug)]
933pub struct SystemVar {
934    definition: VarDefinition,
935    /// Value currently persisted to disk.
936    persisted_value: Option<Box<dyn Value>>,
937    /// Current default, not persisted to disk.
938    dynamic_default: Option<Box<dyn Value>>,
939}
940
941impl Clone for SystemVar {
942    fn clone(&self) -> Self {
943        SystemVar {
944            definition: self.definition.clone(),
945            persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
946            dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
947        }
948    }
949}
950
951impl SystemVar {
952    pub fn new(definition: VarDefinition) -> Self {
953        SystemVar {
954            definition,
955            persisted_value: None,
956            dynamic_default: None,
957        }
958    }
959
960    fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
961        let v = self.definition.parse(input)?;
962        Ok(self.definition.default_value() == v.as_ref())
963    }
964
965    pub fn value_dyn(&self) -> &dyn Value {
966        self.persisted_value
967            .as_deref()
968            .or(self.dynamic_default.as_deref())
969            .unwrap_or_else(|| self.definition.default_value())
970    }
971
972    pub fn value<V: 'static>(&self) -> &V {
973        let val = self.value_dyn();
974        val.as_any().downcast_ref::<V>().expect("success")
975    }
976
977    fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
978        let v = self.definition.parse(input)?;
979        // Validate our parsed value.
980        self.validate_constraints(v.as_ref())?;
981        Ok(v)
982    }
983
984    fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
985        let v = self.parse(input)?;
986
987        if self.persisted_value.as_ref() != Some(&v) {
988            self.persisted_value = Some(v);
989            Ok(true)
990        } else {
991            Ok(false)
992        }
993    }
994
995    fn reset(&mut self) -> bool {
996        if self.persisted_value.is_some() {
997            self.persisted_value = None;
998            true
999        } else {
1000            false
1001        }
1002    }
1003
1004    fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1005        let v = self.definition.parse(input)?;
1006        self.dynamic_default = Some(v);
1007        Ok(())
1008    }
1009
1010    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1011        if let Some(constraint) = &self.definition.constraint {
1012            constraint.check_constraint(self, self.value_dyn(), val)
1013        } else {
1014            Ok(())
1015        }
1016    }
1017}
1018
1019impl Var for SystemVar {
1020    fn name(&self) -> &'static str {
1021        self.definition.name.as_str()
1022    }
1023
1024    fn value(&self) -> String {
1025        self.value_dyn().format()
1026    }
1027
1028    fn description(&self) -> &'static str {
1029        self.definition.description
1030    }
1031
1032    fn type_name(&self) -> Cow<'static, str> {
1033        self.definition.type_name()
1034    }
1035
1036    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1037        self.definition.visible(user, system_vars)
1038    }
1039}
1040
1041#[derive(Debug, Error)]
1042pub enum NetworkPolicyError {
1043    #[error("Access denied for address {0}")]
1044    AddressDenied(IpAddr),
1045}
1046
1047/// On disk variables.
1048///
1049/// See the [`crate::session::vars`] module documentation for more details on the
1050/// Materialize configuration model.
1051#[derive(Derivative, Clone)]
1052#[derivative(Debug)]
1053pub struct SystemVars {
1054    /// Allows "unsafe" parameters to be set.
1055    allow_unsafe: bool,
1056    /// Set of all [`SystemVar`]s.
1057    vars: BTreeMap<&'static UncasedStr, SystemVar>,
1058    /// External components interested in when a [`SystemVar`] gets updated.
1059    #[derivative(Debug = "ignore")]
1060    callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1061
1062    /// NB: This is intentionally disconnected from the one that is plumbed around to persist and
1063    /// the controllers. This is so we can explicitly control and reason about when changes to config
1064    /// values are propagated to the rest of the system.
1065    dyncfgs: ConfigSet,
1066}
1067
1068impl Default for SystemVars {
1069    fn default() -> Self {
1070        Self::new()
1071    }
1072}
1073
1074impl SystemVars {
1075    pub fn new() -> Self {
1076        let system_vars = vec![
1077            &MAX_KAFKA_CONNECTIONS,
1078            &MAX_POSTGRES_CONNECTIONS,
1079            &MAX_MYSQL_CONNECTIONS,
1080            &MAX_SQL_SERVER_CONNECTIONS,
1081            &MAX_AWS_PRIVATELINK_CONNECTIONS,
1082            &MAX_TABLES,
1083            &MAX_SOURCES,
1084            &MAX_SINKS,
1085            &MAX_MATERIALIZED_VIEWS,
1086            &MAX_CLUSTERS,
1087            &MAX_REPLICAS_PER_CLUSTER,
1088            &MAX_CREDIT_CONSUMPTION_RATE,
1089            &MAX_DATABASES,
1090            &MAX_SCHEMAS_PER_DATABASE,
1091            &MAX_OBJECTS_PER_SCHEMA,
1092            &MAX_SECRETS,
1093            &MAX_ROLES,
1094            &MAX_CONTINUAL_TASKS,
1095            &MAX_NETWORK_POLICIES,
1096            &MAX_RULES_PER_NETWORK_POLICY,
1097            &MAX_RESULT_SIZE,
1098            &MAX_COPY_FROM_SIZE,
1099            &ALLOWED_CLUSTER_REPLICA_SIZES,
1100            &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1101            &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1102            &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1103            &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1104            &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1105            &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1106            &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1107            &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1108            &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1109            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1110            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1111            &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1112            &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1113            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1114            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1115            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1116            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1117            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1118            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1119            &STORAGE_STATISTICS_INTERVAL,
1120            &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1121            &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1122            &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1123            &PERSIST_FAST_PATH_LIMIT,
1124            &METRICS_RETENTION,
1125            &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1126            &ENABLE_RBAC_CHECKS,
1127            &PG_SOURCE_CONNECT_TIMEOUT,
1128            &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1129            &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1130            &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1131            &PG_SOURCE_TCP_USER_TIMEOUT,
1132            &PG_SOURCE_TCP_CONFIGURE_SERVER,
1133            &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1134            &PG_SOURCE_WAL_SENDER_TIMEOUT,
1135            &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1136            &MYSQL_SOURCE_TCP_KEEPALIVE,
1137            &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1138            &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1139            &MYSQL_SOURCE_CONNECT_TIMEOUT,
1140            &SSH_CHECK_INTERVAL,
1141            &SSH_CONNECT_TIMEOUT,
1142            &SSH_KEEPALIVES_IDLE,
1143            &KAFKA_SOCKET_KEEPALIVE,
1144            &KAFKA_SOCKET_TIMEOUT,
1145            &KAFKA_TRANSACTION_TIMEOUT,
1146            &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1147            &KAFKA_FETCH_METADATA_TIMEOUT,
1148            &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1149            &ENABLE_LAUNCHDARKLY,
1150            &MAX_CONNECTIONS,
1151            &NETWORK_POLICY,
1152            &SUPERUSER_RESERVED_CONNECTIONS,
1153            &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1154            &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1155            &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1156            &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1157            &ENABLE_STORAGE_SHARD_FINALIZATION,
1158            &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1159            &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1160            &ENABLE_REDUCE_REDUCTION,
1161            &MIN_TIMESTAMP_INTERVAL,
1162            &MAX_TIMESTAMP_INTERVAL,
1163            &LOGGING_FILTER,
1164            &OPENTELEMETRY_FILTER,
1165            &LOGGING_FILTER_DEFAULTS,
1166            &OPENTELEMETRY_FILTER_DEFAULTS,
1167            &SENTRY_FILTERS,
1168            &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1169            &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1170            &grpc_client::CONNECT_TIMEOUT,
1171            &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1172            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1173            &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1174            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1175            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1176            &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1177            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1178            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1179            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1180            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1181            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1182            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1183            &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1184            &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1185            &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1186            &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1187            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1188            &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1189            &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1190            &STATEMENT_LOGGING_TARGET_DATA_RATE,
1191            &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1192            &ENABLE_INTERNAL_STATEMENT_LOGGING,
1193            &OPTIMIZER_STATS_TIMEOUT,
1194            &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1195            &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1196            &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1197            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1198            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1199            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1200            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1201            &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1202            &FORCE_SOURCE_TABLE_SYNTAX,
1203            &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1204            &SCRAM_ITERATIONS,
1205        ];
1206
1207        let dyncfgs = mz_dyncfgs::all_dyncfgs();
1208        let dyncfg_vars: Vec<_> = dyncfgs
1209            .entries()
1210            .map(|cfg| match cfg.default() {
1211                ConfigVal::Bool(default) => {
1212                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1213                }
1214                ConfigVal::U32(default) => {
1215                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1216                }
1217                ConfigVal::Usize(default) => {
1218                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1219                }
1220                ConfigVal::OptUsize(default) => {
1221                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1222                }
1223                ConfigVal::F64(default) => {
1224                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1225                }
1226                ConfigVal::String(default) => {
1227                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1228                }
1229                ConfigVal::Duration(default) => {
1230                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1231                }
1232                ConfigVal::Json(default) => {
1233                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1234                }
1235            })
1236            .collect();
1237
1238        let vars: BTreeMap<_, _> = system_vars
1239            .into_iter()
1240            // Include all of our feature flags.
1241            .chain(definitions::FEATURE_FLAGS.iter().copied())
1242            // Include the subset of Session variables we allow system defaults for.
1243            .chain(SESSION_SYSTEM_VARS.values().copied())
1244            .cloned()
1245            // Include Persist configs.
1246            .chain(dyncfg_vars)
1247            .map(|var| (var.name, SystemVar::new(var)))
1248            .collect();
1249
1250        let vars = SystemVars {
1251            vars,
1252            callbacks: BTreeMap::new(),
1253            allow_unsafe: false,
1254            dyncfgs,
1255        };
1256
1257        vars
1258    }
1259
1260    pub fn dyncfgs(&self) -> &ConfigSet {
1261        &self.dyncfgs
1262    }
1263
1264    pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1265        self.allow_unsafe = allow_unsafe;
1266        self
1267    }
1268
1269    pub fn allow_unsafe(&self) -> bool {
1270        self.allow_unsafe
1271    }
1272
1273    fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1274        let val = self
1275            .vars
1276            .get(var.name)
1277            .expect("provided var should be in state");
1278
1279        val.value_dyn()
1280            .as_any()
1281            .downcast_ref::<V>()
1282            .expect("provided var type should matched stored var")
1283    }
1284
1285    fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1286        let val = self
1287            .vars
1288            .get(name)
1289            .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1290
1291        val.value_dyn()
1292            .as_any()
1293            .downcast_ref()
1294            .expect("provided var type should matched stored var")
1295    }
1296
1297    /// Reset all the values to their defaults (preserving
1298    /// defaults from `VarMut::set_default).
1299    pub fn reset_all(&mut self) {
1300        for (_, var) in &mut self.vars {
1301            var.reset();
1302        }
1303    }
1304
1305    /// Returns an iterator over the configuration parameters and their current
1306    /// values on disk.
1307    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1308        self.vars
1309            .values()
1310            .map(|v| v.as_var())
1311            .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1312    }
1313
1314    /// Returns an iterator over the configuration parameters and their current
1315    /// values on disk. Compared to [`SystemVars::iter`], this should omit vars
1316    /// that shouldn't be synced by SystemParameterFrontend.
1317    pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1318        self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1319    }
1320
1321    /// Returns an iterator over the configuration parameters that can be overriden per-Session.
1322    pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1323        self.vars
1324            .values()
1325            .map(|v| v.as_var())
1326            .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1327    }
1328
1329    /// Returns whether or not this parameter can be modified by a superuser.
1330    pub fn user_modifiable(&self, name: &str) -> bool {
1331        SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1332            || name == ENABLE_RBAC_CHECKS.name()
1333            || name == NETWORK_POLICY.name()
1334    }
1335
1336    /// Returns a [`Var`] representing the configuration parameter with the
1337    /// specified name.
1338    ///
1339    /// Configuration parameters are matched case insensitively. If no such
1340    /// configuration parameter exists, `get` returns an error.
1341    ///
1342    /// Note that:
1343    /// - If `name` is known at compile time, you should instead use the named
1344    /// accessor to access the variable with its true Rust type. For example,
1345    /// `self.get("max_tables").value()` returns the string `"25"` or the
1346    /// current value, while `self.max_tables()` returns an i32.
1347    ///
1348    /// - This function does not check that the access variable should be
1349    /// visible because of other settings or users. Before or after accessing
1350    /// this method, you should call `Var::visible`.
1351    ///
1352    /// # Errors
1353    ///
1354    /// The call will return an error:
1355    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1356    pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1357        self.vars
1358            .get(UncasedStr::new(name))
1359            .map(|v| v.as_var())
1360            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1361    }
1362
1363    /// Check if the given `values` is the default value for the [`Var`]
1364    /// identified by `name`.
1365    ///
1366    /// Note that this function does not check that the access variable should
1367    /// be visible because of other settings or users. Before or after accessing
1368    /// this method, you should call `Var::visible`.
1369    ///
1370    /// # Errors
1371    ///
1372    /// The call will return an error:
1373    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1374    /// 2. If `values` does not represent a valid [`SystemVars`] value for
1375    ///    `name`.
1376    pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1377        self.vars
1378            .get(UncasedStr::new(name))
1379            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1380            .and_then(|v| v.is_default(input))
1381    }
1382
1383    /// Sets the configuration parameter named `name` to the value represented
1384    /// by `input`.
1385    ///
1386    /// Like with [`SystemVars::get`], configuration parameters are matched case
1387    /// insensitively. If `input` is not valid, as determined by the underlying
1388    /// configuration parameter, or if the named configuration parameter does
1389    /// not exist, an error is returned.
1390    ///
1391    /// Return a `bool` value indicating whether the [`Var`] identified by
1392    /// `name` was modified by this call (it won't be if it already had the
1393    /// given `input`).
1394    ///
1395    /// Note that this function does not check that the access variable should
1396    /// be visible because of other settings or users. Before or after accessing
1397    /// this method, you should call `Var::visible`.
1398    ///
1399    /// # Errors
1400    ///
1401    /// The call will return an error:
1402    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1403    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1404    ///    `name`.
1405    pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1406        let result = self
1407            .vars
1408            .get_mut(UncasedStr::new(name))
1409            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1410            .and_then(|v| v.set(input))?;
1411        self.notify_callbacks(name);
1412        Ok(result)
1413    }
1414
1415    /// Parses the configuration parameter value represented by `input` named
1416    /// `name`.
1417    ///
1418    /// Like with [`SystemVars::get`], configuration parameters are matched case
1419    /// insensitively. If `input` is not valid, as determined by the underlying
1420    /// configuration parameter, or if the named configuration parameter does
1421    /// not exist, an error is returned.
1422    ///
1423    /// Return a `Box<dyn Value>` that is the result of parsing `input`.
1424    ///
1425    /// Note that this function does not check that the access variable should
1426    /// be visible because of other settings or users. Before or after accessing
1427    /// this method, you should call `Var::visible`.
1428    ///
1429    /// # Errors
1430    ///
1431    /// The call will return an error:
1432    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1433    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1434    ///    `name`.
1435    pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1436        self.vars
1437            .get(UncasedStr::new(name))
1438            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1439            .and_then(|v| v.parse(input))
1440    }
1441
1442    /// Set the default for this variable. This is the value this
1443    /// variable will be be `reset` to. If no default is set, the static default in the
1444    /// variable definition is used instead.
1445    ///
1446    /// Note that this function does not check that the access variable should
1447    /// be visible because of other settings or users. Before or after accessing
1448    /// this method, you should call `Var::visible`.
1449    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1450        let result = self
1451            .vars
1452            .get_mut(UncasedStr::new(name))
1453            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1454            .and_then(|v| v.set_default(input))?;
1455        self.notify_callbacks(name);
1456        Ok(result)
1457    }
1458
1459    /// Sets the configuration parameter named `name` to its default value.
1460    ///
1461    /// Like with [`SystemVars::get`], configuration parameters are matched case
1462    /// insensitively. If the named configuration parameter does not exist, an
1463    /// error is returned.
1464    ///
1465    /// Return a `bool` value indicating whether the [`Var`] identified by
1466    /// `name` was modified by this call (it won't be if was already reset).
1467    ///
1468    /// Note that this function does not check that the access variable should
1469    /// be visible because of other settings or users. Before or after accessing
1470    /// this method, you should call `Var::visible`.
1471    ///
1472    /// # Errors
1473    ///
1474    /// The call will return an error:
1475    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1476    pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1477        let result = self
1478            .vars
1479            .get_mut(UncasedStr::new(name))
1480            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1481            .map(|v| v.reset())?;
1482        self.notify_callbacks(name);
1483        Ok(result)
1484    }
1485
1486    /// Returns a map from each system parameter's name to its default value.
1487    pub fn defaults(&self) -> BTreeMap<String, String> {
1488        self.vars
1489            .iter()
1490            .map(|(name, var)| {
1491                let default = var
1492                    .dynamic_default
1493                    .as_deref()
1494                    .unwrap_or_else(|| var.definition.default_value());
1495                (name.as_str().to_owned(), default.format())
1496            })
1497            .collect()
1498    }
1499
1500    /// Registers a closure that will get called when the value for the
1501    /// specified [`VarDefinition`] changes.
1502    ///
1503    /// The callback is guaranteed to be called at least once.
1504    pub fn register_callback(
1505        &mut self,
1506        var: &VarDefinition,
1507        callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1508    ) {
1509        self.callbacks
1510            .entry(var.name().to_string())
1511            .or_default()
1512            .push(callback);
1513        self.notify_callbacks(var.name());
1514    }
1515
1516    /// Notify any external components interested in this variable.
1517    fn notify_callbacks(&self, name: &str) {
1518        // Get the callbacks interested in this variable.
1519        if let Some(callbacks) = self.callbacks.get(name) {
1520            for callback in callbacks {
1521                (callback)(self);
1522            }
1523        }
1524    }
1525
1526    /// Returns the system default for the [`CLUSTER`] session variable. To know the active cluster
1527    /// for the current session, you must check the [`SessionVars`].
1528    pub fn default_cluster(&self) -> String {
1529        self.expect_value::<String>(&CLUSTER).to_owned()
1530    }
1531
1532    /// Returns the value of the `max_kafka_connections` configuration parameter.
1533    pub fn max_kafka_connections(&self) -> u32 {
1534        *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1535    }
1536
1537    /// Returns the value of the `max_postgres_connections` configuration parameter.
1538    pub fn max_postgres_connections(&self) -> u32 {
1539        *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1540    }
1541
1542    /// Returns the value of the `max_mysql_connections` configuration parameter.
1543    pub fn max_mysql_connections(&self) -> u32 {
1544        *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1545    }
1546
1547    /// Returns the value of the `max_sql_server_connections` configuration parameter.
1548    pub fn max_sql_server_connections(&self) -> u32 {
1549        *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1550    }
1551
1552    /// Returns the value of the `max_aws_privatelink_connections` configuration parameter.
1553    pub fn max_aws_privatelink_connections(&self) -> u32 {
1554        *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1555    }
1556
1557    /// Returns the value of the `max_tables` configuration parameter.
1558    pub fn max_tables(&self) -> u32 {
1559        *self.expect_value(&MAX_TABLES)
1560    }
1561
1562    /// Returns the value of the `max_sources` configuration parameter.
1563    pub fn max_sources(&self) -> u32 {
1564        *self.expect_value(&MAX_SOURCES)
1565    }
1566
1567    /// Returns the value of the `max_sinks` configuration parameter.
1568    pub fn max_sinks(&self) -> u32 {
1569        *self.expect_value(&MAX_SINKS)
1570    }
1571
1572    /// Returns the value of the `max_materialized_views` configuration parameter.
1573    pub fn max_materialized_views(&self) -> u32 {
1574        *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1575    }
1576
1577    /// Returns the value of the `max_clusters` configuration parameter.
1578    pub fn max_clusters(&self) -> u32 {
1579        *self.expect_value(&MAX_CLUSTERS)
1580    }
1581
1582    /// Returns the value of the `max_replicas_per_cluster` configuration parameter.
1583    pub fn max_replicas_per_cluster(&self) -> u32 {
1584        *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1585    }
1586
1587    /// Returns the value of the `max_credit_consumption_rate` configuration parameter.
1588    pub fn max_credit_consumption_rate(&self) -> Numeric {
1589        *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1590    }
1591
1592    /// Returns the value of the `max_databases` configuration parameter.
1593    pub fn max_databases(&self) -> u32 {
1594        *self.expect_value(&MAX_DATABASES)
1595    }
1596
1597    /// Returns the value of the `max_schemas_per_database` configuration parameter.
1598    pub fn max_schemas_per_database(&self) -> u32 {
1599        *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1600    }
1601
1602    /// Returns the value of the `max_objects_per_schema` configuration parameter.
1603    pub fn max_objects_per_schema(&self) -> u32 {
1604        *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1605    }
1606
1607    /// Returns the value of the `max_secrets` configuration parameter.
1608    pub fn max_secrets(&self) -> u32 {
1609        *self.expect_value(&MAX_SECRETS)
1610    }
1611
1612    /// Returns the value of the `max_roles` configuration parameter.
1613    pub fn max_roles(&self) -> u32 {
1614        *self.expect_value(&MAX_ROLES)
1615    }
1616
1617    /// Returns the value of the `max_continual_tasks` configuration parameter.
1618    pub fn max_continual_tasks(&self) -> u32 {
1619        *self.expect_value(&MAX_CONTINUAL_TASKS)
1620    }
1621
1622    /// Returns the value of the `max_network_policies` configuration parameter.
1623    pub fn max_network_policies(&self) -> u32 {
1624        *self.expect_value(&MAX_NETWORK_POLICIES)
1625    }
1626
1627    /// Returns the value of the `max_network_policies` configuration parameter.
1628    pub fn max_rules_per_network_policy(&self) -> u32 {
1629        *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1630    }
1631
1632    /// Returns the value of the `max_result_size` configuration parameter.
1633    pub fn max_result_size(&self) -> u64 {
1634        self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1635    }
1636
1637    /// Returns the value of the `max_copy_from_size` configuration parameter.
1638    pub fn max_copy_from_size(&self) -> u32 {
1639        *self.expect_value(&MAX_COPY_FROM_SIZE)
1640    }
1641
1642    /// Returns the value of the `allowed_cluster_replica_sizes` configuration parameter.
1643    pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1644        self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1645            .into_iter()
1646            .map(|s| s.as_str().into())
1647            .collect()
1648    }
1649
1650    /// Returns the value of the `default_cluster_replication_factor` configuration parameter.
1651    pub fn default_cluster_replication_factor(&self) -> u32 {
1652        *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1653    }
1654
1655    pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1656        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1657    }
1658
1659    pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1660        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1661    }
1662
1663    pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1664        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1665    }
1666
1667    pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1668        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1669    }
1670
1671    pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1672        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1673    }
1674
1675    pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1676        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1677    }
1678
1679    pub fn upsert_rocksdb_bottommost_compression_type(
1680        &self,
1681    ) -> mz_rocksdb_types::config::CompressionType {
1682        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1683    }
1684
1685    pub fn upsert_rocksdb_batch_size(&self) -> usize {
1686        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1687    }
1688
1689    pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1690        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1691    }
1692
1693    pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1694        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1695    }
1696
1697    pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1698        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1699    }
1700
1701    pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1702        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1703    }
1704
1705    pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1706        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1707    }
1708
1709    pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1710        *self.expect_value(
1711            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1712        )
1713    }
1714
1715    pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1716        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1717    }
1718
1719    pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1720        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1721    }
1722
1723    pub fn persist_fast_path_limit(&self) -> usize {
1724        *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1725    }
1726
1727    /// Returns the `pg_source_connect_timeout` configuration parameter.
1728    pub fn pg_source_connect_timeout(&self) -> Duration {
1729        *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1730    }
1731
1732    /// Returns the `pg_source_tcp_keepalives_retries` configuration parameter.
1733    pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1734        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1735    }
1736
1737    /// Returns the `pg_source_tcp_keepalives_idle` configuration parameter.
1738    pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1739        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1740    }
1741
1742    /// Returns the `pg_source_tcp_keepalives_interval` configuration parameter.
1743    pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1744        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1745    }
1746
1747    /// Returns the `pg_source_tcp_user_timeout` configuration parameter.
1748    pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1749        *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1750    }
1751
1752    /// Returns the `pg_source_tcp_configure_server` configuration parameter.
1753    pub fn pg_source_tcp_configure_server(&self) -> bool {
1754        *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1755    }
1756
1757    /// Returns the `pg_source_snapshot_statement_timeout` configuration parameter.
1758    pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1759        *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1760    }
1761
1762    /// Returns the `pg_source_wal_sender_timeout` configuration parameter.
1763    pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1764        *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1765    }
1766
1767    /// Returns the `pg_source_snapshot_collect_strict_count` configuration parameter.
1768    pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1769        *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1770    }
1771
1772    /// Returns the `mysql_source_tcp_keepalive` configuration parameter.
1773    pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1774        *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1775    }
1776
1777    /// Returns the `mysql_source_snapshot_max_execution_time` configuration parameter.
1778    pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1779        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1780    }
1781
1782    /// Returns the `mysql_source_snapshot_lock_wait_timeout` configuration parameter.
1783    pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1784        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1785    }
1786
1787    /// Returns the `mysql_source_connect_timeout` configuration parameter.
1788    pub fn mysql_source_connect_timeout(&self) -> Duration {
1789        *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1790    }
1791
1792    /// Returns the `ssh_check_interval` configuration parameter.
1793    pub fn ssh_check_interval(&self) -> Duration {
1794        *self.expect_value(&SSH_CHECK_INTERVAL)
1795    }
1796
1797    /// Returns the `ssh_connect_timeout` configuration parameter.
1798    pub fn ssh_connect_timeout(&self) -> Duration {
1799        *self.expect_value(&SSH_CONNECT_TIMEOUT)
1800    }
1801
1802    /// Returns the `ssh_keepalives_idle` configuration parameter.
1803    pub fn ssh_keepalives_idle(&self) -> Duration {
1804        *self.expect_value(&SSH_KEEPALIVES_IDLE)
1805    }
1806
1807    /// Returns the `kafka_socket_keepalive` configuration parameter.
1808    pub fn kafka_socket_keepalive(&self) -> bool {
1809        *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1810    }
1811
1812    /// Returns the `kafka_socket_timeout` configuration parameter.
1813    pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1814        *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1815    }
1816
1817    /// Returns the `kafka_transaction_timeout` configuration parameter.
1818    pub fn kafka_transaction_timeout(&self) -> Duration {
1819        *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1820    }
1821
1822    /// Returns the `kafka_socket_connection_setup_timeout` configuration parameter.
1823    pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1824        *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1825    }
1826
1827    /// Returns the `kafka_fetch_metadata_timeout` configuration parameter.
1828    pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1829        *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1830    }
1831
1832    /// Returns the `kafka_progress_record_fetch_timeout` configuration parameter.
1833    pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1834        *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1835    }
1836
1837    /// Returns the `crdb_connect_timeout` configuration parameter.
1838    pub fn crdb_connect_timeout(&self) -> Duration {
1839        *self.expect_config_value(UncasedStr::new(
1840            mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1841        ))
1842    }
1843
1844    /// Returns the `crdb_tcp_user_timeout` configuration parameter.
1845    pub fn crdb_tcp_user_timeout(&self) -> Duration {
1846        *self.expect_config_value(UncasedStr::new(
1847            mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1848        ))
1849    }
1850
1851    /// Returns the `storage_dataflow_max_inflight_bytes` configuration parameter.
1852    pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1853        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1854    }
1855
1856    /// Returns the `storage_dataflow_max_inflight_bytes_to_cluster_size_fraction` configuration parameter.
1857    pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1858        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1859    }
1860
1861    /// Returns the `storage_shrink_upsert_unused_buffers_by_ratio` configuration parameter.
1862    pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1863        *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1864    }
1865
1866    /// Returns the `storage_dataflow_max_inflight_bytes_disk_only` configuration parameter.
1867    pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1868        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1869    }
1870
1871    /// Returns the `storage_statistics_interval` configuration parameter.
1872    pub fn storage_statistics_interval(&self) -> Duration {
1873        *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1874    }
1875
1876    /// Returns the `storage_statistics_collection_interval` configuration parameter.
1877    pub fn storage_statistics_collection_interval(&self) -> Duration {
1878        *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1879    }
1880
1881    /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
1882    pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1883        *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1884    }
1885
1886    /// Returns the `persist_stats_filter_enabled` configuration parameter.
1887    pub fn persist_stats_filter_enabled(&self) -> bool {
1888        *self.expect_config_value(UncasedStr::new(
1889            mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1890        ))
1891    }
1892
1893    pub fn scram_iterations(&self) -> NonZeroU32 {
1894        *self.expect_value(&SCRAM_ITERATIONS)
1895    }
1896
1897    pub fn dyncfg_updates(&self) -> ConfigUpdates {
1898        let mut updates = ConfigUpdates::default();
1899        for entry in self.dyncfgs.entries() {
1900            let name = UncasedStr::new(entry.name());
1901            let val = match entry.val() {
1902                ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1903                ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1904                ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1905                ConfigVal::OptUsize(_) => {
1906                    ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1907                }
1908                ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1909                ConfigVal::String(_) => {
1910                    ConfigVal::from(self.expect_config_value::<String>(name).clone())
1911                }
1912                ConfigVal::Duration(_) => {
1913                    ConfigVal::from(*self.expect_config_value::<Duration>(name))
1914                }
1915                ConfigVal::Json(_) => {
1916                    ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1917                }
1918            };
1919            updates.add_dynamic(entry.name(), val);
1920        }
1921        updates.apply(&self.dyncfgs);
1922        updates
1923    }
1924
1925    /// Returns the `metrics_retention` configuration parameter.
1926    pub fn metrics_retention(&self) -> Duration {
1927        *self.expect_value(&METRICS_RETENTION)
1928    }
1929
1930    /// Returns the `unsafe_mock_audit_event_timestamp` configuration parameter.
1931    pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1932        *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1933    }
1934
1935    /// Returns the `enable_rbac_checks` configuration parameter.
1936    pub fn enable_rbac_checks(&self) -> bool {
1937        *self.expect_value(&ENABLE_RBAC_CHECKS)
1938    }
1939
1940    /// Returns the `max_connections` configuration parameter.
1941    pub fn max_connections(&self) -> u32 {
1942        *self.expect_value(&MAX_CONNECTIONS)
1943    }
1944
1945    pub fn default_network_policy_name(&self) -> String {
1946        self.expect_value::<String>(&NETWORK_POLICY).clone()
1947    }
1948
1949    /// Returns the `superuser_reserved_connections` configuration parameter.
1950    pub fn superuser_reserved_connections(&self) -> u32 {
1951        *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
1952    }
1953
1954    pub fn keep_n_source_status_history_entries(&self) -> usize {
1955        *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
1956    }
1957
1958    pub fn keep_n_sink_status_history_entries(&self) -> usize {
1959        *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
1960    }
1961
1962    pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
1963        *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
1964    }
1965
1966    pub fn replica_status_history_retention_window(&self) -> Duration {
1967        *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
1968    }
1969
1970    /// Returns the `enable_storage_shard_finalization` configuration parameter.
1971    pub fn enable_storage_shard_finalization(&self) -> bool {
1972        *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
1973    }
1974
1975    pub fn enable_consolidate_after_union_negate(&self) -> bool {
1976        *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
1977    }
1978
1979    pub fn enable_reduce_reduction(&self) -> bool {
1980        *self.expect_value(&ENABLE_REDUCE_REDUCTION)
1981    }
1982
1983    /// Returns the `enable_default_connection_validation` configuration parameter.
1984    pub fn enable_default_connection_validation(&self) -> bool {
1985        *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
1986    }
1987
1988    /// Returns the `min_timestamp_interval` configuration parameter.
1989    pub fn min_timestamp_interval(&self) -> Duration {
1990        *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
1991    }
1992    /// Returns the `max_timestamp_interval` configuration parameter.
1993    pub fn max_timestamp_interval(&self) -> Duration {
1994        *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
1995    }
1996
1997    pub fn logging_filter(&self) -> CloneableEnvFilter {
1998        self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
1999            .clone()
2000    }
2001
2002    pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2003        self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2004            .clone()
2005    }
2006
2007    pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2008        self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2009            .clone()
2010    }
2011
2012    pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2013        self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2014            .clone()
2015    }
2016
2017    pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2018        self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2019            .clone()
2020    }
2021
2022    pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2023        *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2024    }
2025
2026    pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2027        *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2028    }
2029
2030    pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2031        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2032    }
2033
2034    pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2035        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2036    }
2037
2038    pub fn grpc_connect_timeout(&self) -> Duration {
2039        *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2040    }
2041
2042    pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2043        *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2044    }
2045
2046    pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2047        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2048    }
2049
2050    pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2051        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2052    }
2053
2054    pub fn cluster_enable_topology_spread(&self) -> bool {
2055        *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2056    }
2057
2058    pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2059        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2060    }
2061
2062    pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2063        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2064    }
2065
2066    pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2067        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2068    }
2069
2070    pub fn cluster_topology_spread_soft(&self) -> bool {
2071        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2072    }
2073
2074    pub fn cluster_soften_az_affinity(&self) -> bool {
2075        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2076    }
2077
2078    pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2079        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2080    }
2081
2082    pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2083        *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2084    }
2085
2086    pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2087        *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2088    }
2089
2090    pub fn cluster_security_context_enabled(&self) -> bool {
2091        *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2092    }
2093
2094    pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2095        *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2096    }
2097
2098    /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter.
2099    pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2100        *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2101    }
2102
2103    pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2104        *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2105    }
2106
2107    pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2108        *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2109    }
2110
2111    /// Returns the `statement_logging_max_sample_rate` configuration parameter.
2112    pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2113        *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2114    }
2115
2116    /// Returns the `statement_logging_default_sample_rate` configuration parameter.
2117    pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2118        *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2119    }
2120
2121    /// Returns the `enable_internal_statement_logging` configuration parameter.
2122    pub fn enable_internal_statement_logging(&self) -> bool {
2123        *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2124    }
2125
2126    /// Returns the `optimizer_stats_timeout` configuration parameter.
2127    pub fn optimizer_stats_timeout(&self) -> Duration {
2128        *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2129    }
2130
2131    /// Returns the `optimizer_oneshot_stats_timeout` configuration parameter.
2132    pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2133        *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2134    }
2135
2136    /// Returns the `webhook_concurrent_request_limit` configuration parameter.
2137    pub fn webhook_concurrent_request_limit(&self) -> usize {
2138        *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2139    }
2140
2141    /// Returns the `pg_timestamp_oracle_connection_pool_max_size` configuration parameter.
2142    pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2143        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2144    }
2145
2146    /// Returns the `pg_timestamp_oracle_connection_pool_max_wait` configuration parameter.
2147    pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2148        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2149    }
2150
2151    /// Returns the `pg_timestamp_oracle_connection_pool_ttl` configuration parameter.
2152    pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2153        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2154    }
2155
2156    /// Returns the `pg_timestamp_oracle_connection_pool_ttl_stagger` configuration parameter.
2157    pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2158        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2159    }
2160
2161    /// Returns the `user_storage_managed_collections_batch_duration` configuration parameter.
2162    pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2163        *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2164    }
2165
2166    pub fn force_source_table_syntax(&self) -> bool {
2167        *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2168    }
2169
2170    pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2171        *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2172    }
2173
2174    /// Returns whether the named variable is a controller configuration parameter.
2175    pub fn is_controller_config_var(&self, name: &str) -> bool {
2176        self.is_dyncfg_var(name)
2177    }
2178
2179    /// Returns whether the named variable is a compute configuration parameter
2180    /// (things that go in `ComputeParameters` and are sent to replicas via `UpdateConfiguration`
2181    /// commands).
2182    pub fn is_compute_config_var(&self, name: &str) -> bool {
2183        name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2184    }
2185
2186    /// Returns whether the named variable is a metrics configuration parameter
2187    pub fn is_metrics_config_var(&self, name: &str) -> bool {
2188        self.is_dyncfg_var(name)
2189    }
2190
2191    /// Returns whether the named variable is a storage configuration parameter.
2192    pub fn is_storage_config_var(&self, name: &str) -> bool {
2193        name == PG_SOURCE_CONNECT_TIMEOUT.name()
2194            || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2195            || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2196            || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2197            || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2198            || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2199            || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2200            || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2201            || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2202            || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2203            || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2204            || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2205            || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2206            || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2207            || name == SSH_CHECK_INTERVAL.name()
2208            || name == SSH_CONNECT_TIMEOUT.name()
2209            || name == SSH_KEEPALIVES_IDLE.name()
2210            || name == KAFKA_SOCKET_KEEPALIVE.name()
2211            || name == KAFKA_SOCKET_TIMEOUT.name()
2212            || name == KAFKA_TRANSACTION_TIMEOUT.name()
2213            || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2214            || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2215            || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2216            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2217            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2218            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2219            || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2220            || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2221            || name == STORAGE_STATISTICS_INTERVAL.name()
2222            || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2223            || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2224            || is_upsert_rocksdb_config_var(name)
2225            || self.is_dyncfg_var(name)
2226            || is_tracing_var(name)
2227    }
2228
2229    /// Returns whether the named variable is a dyncfg configuration parameter.
2230    fn is_dyncfg_var(&self, name: &str) -> bool {
2231        self.dyncfgs.entries().any(|e| name == e.name())
2232    }
2233}
2234
2235pub fn is_tracing_var(name: &str) -> bool {
2236    name == LOGGING_FILTER.name()
2237        || name == LOGGING_FILTER_DEFAULTS.name()
2238        || name == OPENTELEMETRY_FILTER.name()
2239        || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2240        || name == SENTRY_FILTERS.name()
2241}
2242
2243/// Returns whether the named variable is a caching configuration parameter.
2244pub fn is_secrets_caching_var(name: &str) -> bool {
2245    name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2246}
2247
2248fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2249    name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2250        || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2251        || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2252        || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2253        || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2254        || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2255        || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2256        || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2257        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2258        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2259        || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2260        || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2261}
2262
2263/// Returns whether the named variable is a Postgres/CRDB timestamp oracle
2264/// configuration parameter.
2265pub fn is_pg_timestamp_oracle_config_var(name: &str) -> bool {
2266    name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2267        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2268        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2269        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2270        || name == CRDB_CONNECT_TIMEOUT.name()
2271        || name == CRDB_TCP_USER_TIMEOUT.name()
2272}
2273
2274/// Returns whether the named variable is a cluster scheduling config
2275pub fn is_cluster_scheduling_var(name: &str) -> bool {
2276    name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2277        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2278        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2279        || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2280        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2281        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2282        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2283        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2284        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2285}
2286
2287/// Returns whether the named variable is an HTTP server related config var.
2288pub fn is_http_config_var(name: &str) -> bool {
2289    name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2290}
2291
2292/// Set of [`SystemVar`]s that can also get set at a per-Session level.
2293///
2294/// TODO(parkmycar): Instead of a separate list, make this a field on VarDefinition.
2295static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2296    LazyLock::new(|| {
2297        [
2298            &APPLICATION_NAME,
2299            &CLIENT_ENCODING,
2300            &CLIENT_MIN_MESSAGES,
2301            &CLUSTER,
2302            &CLUSTER_REPLICA,
2303            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2304            &CURRENT_OBJECT_MISSING_WARNINGS,
2305            &DATABASE,
2306            &DATE_STYLE,
2307            &EXTRA_FLOAT_DIGITS,
2308            &INTEGER_DATETIMES,
2309            &INTERVAL_STYLE,
2310            &REAL_TIME_RECENCY_TIMEOUT,
2311            &SEARCH_PATH,
2312            &STANDARD_CONFORMING_STRINGS,
2313            &STATEMENT_TIMEOUT,
2314            &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2315            &TIMEZONE,
2316            &TRANSACTION_ISOLATION,
2317            &MAX_QUERY_RESULT_SIZE,
2318        ]
2319        .into_iter()
2320        .map(|var| (UncasedStr::new(var.name()), var))
2321        .collect()
2322    });
2323
2324// Provides a wrapper to express that a particular `ServerVar` is meant to be used as a feature
2325/// flag.
2326#[derive(Debug)]
2327pub struct FeatureFlag {
2328    pub flag: &'static VarDefinition,
2329    pub feature_desc: &'static str,
2330}
2331
2332impl FeatureFlag {
2333    /// Returns an error unless the feature flag is enabled in the provided
2334    /// `system_vars`.
2335    pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2336        match *system_vars.expect_value::<bool>(self.flag) {
2337            true => Ok(()),
2338            false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2339        }
2340    }
2341}
2342
2343impl PartialEq for FeatureFlag {
2344    fn eq(&self, other: &FeatureFlag) -> bool {
2345        self.flag.name() == other.flag.name()
2346    }
2347}
2348
2349impl Eq for FeatureFlag {}
2350
2351impl Var for MzVersion {
2352    fn name(&self) -> &'static str {
2353        MZ_VERSION_NAME.as_str()
2354    }
2355
2356    fn value(&self) -> String {
2357        self.build_info
2358            .human_version(self.helm_chart_version.clone())
2359    }
2360
2361    fn description(&self) -> &'static str {
2362        "Shows the Materialize server version (Materialize)."
2363    }
2364
2365    fn type_name(&self) -> Cow<'static, str> {
2366        String::type_name()
2367    }
2368
2369    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2370        Ok(())
2371    }
2372}
2373
2374impl Var for User {
2375    fn name(&self) -> &'static str {
2376        IS_SUPERUSER_NAME.as_str()
2377    }
2378
2379    fn value(&self) -> String {
2380        self.is_superuser().format()
2381    }
2382
2383    fn description(&self) -> &'static str {
2384        "Reports whether the current session is a superuser (PostgreSQL)."
2385    }
2386
2387    fn type_name(&self) -> Cow<'static, str> {
2388        bool::type_name()
2389    }
2390
2391    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2392        Ok(())
2393    }
2394}