Skip to main content

mz_sql/session/
vars.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Run-time configuration parameters
11//!
12//! ## Overview
13//! Materialize roughly follows the PostgreSQL configuration model, which works
14//! as follows. There is a global set of named configuration parameters, like
15//! `DateStyle` and `client_encoding`. These parameters can be set in several
16//! places: in an on-disk configuration file (in Postgres, named
17//! postgresql.conf), in command line arguments when the server is started, or
18//! at runtime via the `ALTER SYSTEM` or `SET` statements. Parameters that are
19//! set in a session take precedence over database defaults, which in turn take
20//! precedence over command line arguments, which in turn take precedence over
21//! settings in the on-disk configuration. Note that changing the value of
22//! parameters obeys transaction semantics: if a transaction fails to commit,
23//! any parameters that were changed in that transaction (i.e., via `SET`) will
24//! be rolled back to their previous value.
25//!
26//! The Materialize configuration hierarchy at the moment is much simpler.
27//! Global defaults are hardcoded into the binary, and a select few parameters
28//! can be overridden per session. A select few parameters can be overridden on
29//! disk.
30//!
31//! The set of variables that can be overridden per session and the set of
32//! variables that can be overridden on disk are currently disjoint. The
33//! infrastructure has been designed with an eye towards merging these two sets
34//! and supporting additional layers to the hierarchy, however, should the need
35//! arise.
36//!
37//! The configuration parameters that exist are driven by compatibility with
38//! PostgreSQL drivers that expect them, not because they are particularly
39//! important.
40//!
41//! ## Structure
42//! The most meaningful exports from this module are:
43//!
44//! - [`SessionVars`] represent per-session parameters, which each user can
45//!   access independently of one another, and are accessed via `SET`.
46//!
47//!   The fields of [`SessionVars`] are either;
48//!     - `SessionVar`, which is preferable and simply requires full support of
49//!       the `SessionVar` impl for its embedded value type.
50//!     - `ServerVar` for types that do not currently support everything
51//!       required by `SessionVar`, e.g. they are fixed-value parameters.
52//!
53//!   In the fullness of time, all fields in [`SessionVars`] should be
54//!   `SessionVar`.
55//!
56//! - [`SystemVars`] represent system-wide configuration settings and are
57//!   accessed via `ALTER SYSTEM SET`.
58//!
59//!   All elements of [`SystemVars`] are `SystemVar`.
60//!
61//! Some [`VarDefinition`] are also marked as a [`FeatureFlag`]; this is just a
62//! wrapper to make working with a set of [`VarDefinition`] easier, primarily from
63//! within SQL planning, where we might want to check if a feature is enabled
64//! before planning it.
65
66use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
81use mz_persist_client::cfg::{
82    CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
83    CRDB_TCP_USER_TIMEOUT,
84};
85use mz_repr::adt::numeric::Numeric;
86use mz_repr::adt::timestamp::CheckedTimestamp;
87use mz_repr::bytes::ByteSize;
88use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
89use mz_tracing::{CloneableEnvFilter, SerializableDirective};
90use serde::Serialize;
91use thiserror::Error;
92use uncased::UncasedStr;
93
94use crate::ast::Ident;
95use crate::session::user::User;
96
97pub(crate) mod constraints;
98pub(crate) mod definitions;
99pub(crate) mod errors;
100pub(crate) mod polyfill;
101pub(crate) mod value;
102
103pub use definitions::*;
104pub use errors::*;
105pub use value::*;
106
107/// The action to take during end_transaction.
108///
109/// This enum lives here because of convenience: it's more of an adapter
110/// concept but [`SessionVars::end_transaction`] takes it.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum EndTransactionAction {
113    /// Commit the transaction.
114    Commit,
115    /// Rollback the transaction.
116    Rollback,
117}
118
119/// Represents the input to a variable.
120///
121/// Each variable has different rules for how it handles each style of input.
122/// This type allows us to defer interpretation of the input until the
123/// variable-specific interpretation can be applied.
124#[derive(Debug, Clone, Copy)]
125pub enum VarInput<'a> {
126    /// The input has been flattened into a single string.
127    Flat(&'a str),
128    /// The input comes from a SQL `SET` statement and is jumbled across
129    /// multiple components.
130    SqlSet(&'a [String]),
131}
132
133impl<'a> VarInput<'a> {
134    /// Converts the variable input to an owned vector of strings.
135    pub fn to_vec(&self) -> Vec<String> {
136        match self {
137            VarInput::Flat(v) => vec![v.to_string()],
138            VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
139        }
140    }
141}
142
143/// An owned version of [`VarInput`].
144#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
145pub enum OwnedVarInput {
146    /// See [`VarInput::Flat`].
147    Flat(String),
148    /// See [`VarInput::SqlSet`].
149    SqlSet(Vec<String>),
150}
151
152impl OwnedVarInput {
153    /// Converts this owned variable input as a [`VarInput`].
154    pub fn borrow(&self) -> VarInput<'_> {
155        match self {
156            OwnedVarInput::Flat(v) => VarInput::Flat(v),
157            OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
158        }
159    }
160}
161
162/// A `Var` represents a configuration parameter of an arbitrary type.
163pub trait Var: Debug {
164    /// Returns the name of the configuration parameter.
165    fn name(&self) -> &'static str;
166
167    /// Constructs a flattened string representation of the current value of the
168    /// configuration parameter.
169    ///
170    /// The resulting string is guaranteed to be parsable if provided to
171    /// `Value::parse` as a [`VarInput::Flat`].
172    fn value(&self) -> String;
173
174    /// Returns a short sentence describing the purpose of the configuration
175    /// parameter.
176    fn description(&self) -> &'static str;
177
178    /// Returns the name of the type of this variable.
179    fn type_name(&self) -> Cow<'static, str>;
180
181    /// Indicates wither the [`Var`] is visible as a function of the `user` and `system_vars`.
182    /// "Invisible" parameters return `VarErrors`.
183    ///
184    /// Variables marked as `internal` are only visible for the system user.
185    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
186
187    /// Reports whether the variable is only visible in unsafe mode.
188    fn is_unsafe(&self) -> bool {
189        self.name().starts_with("unsafe_")
190    }
191
192    /// Upcast `self` to a `dyn Var`, useful when working with multiple different implementors of
193    /// [`Var`].
194    fn as_var(&self) -> &dyn Var
195    where
196        Self: Sized,
197    {
198        self
199    }
200}
201
202/// A `SessionVar` is the session value for a configuration parameter. If unset,
203/// the server default is used instead.
204///
205/// Note: even though all of the different `*_value` fields are `Box<dyn Value>` they are enforced
206/// to be the same type because we use the `definition`s `parse(...)` method. This is guaranteed to
207/// return the same type as the compiled in default.
208#[derive(Debug)]
209pub struct SessionVar {
210    definition: VarDefinition,
211    /// System or Role default value.
212    default_value: Option<Box<dyn Value>>,
213    /// Value `LOCAL` to a transaction, will be unset at the completion of the transaction.
214    local_value: Option<Box<dyn Value>>,
215    /// Value set during a transaction, will be set if the transaction is committed.
216    staged_value: Option<Box<dyn Value>>,
217    /// Value that overrides the default.
218    session_value: Option<Box<dyn Value>>,
219}
220
221impl Clone for SessionVar {
222    fn clone(&self) -> Self {
223        SessionVar {
224            definition: self.definition.clone(),
225            default_value: self.default_value.as_ref().map(|v| v.box_clone()),
226            local_value: self.local_value.as_ref().map(|v| v.box_clone()),
227            staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
228            session_value: self.session_value.as_ref().map(|v| v.box_clone()),
229        }
230    }
231}
232
233impl SessionVar {
234    pub const fn new(var: VarDefinition) -> Self {
235        SessionVar {
236            definition: var,
237            default_value: None,
238            local_value: None,
239            staged_value: None,
240            session_value: None,
241        }
242    }
243
244    /// Checks if the provided [`VarInput`] is valid for the current session variable, returning
245    /// the formatted output if it's valid.
246    pub fn check(&self, input: VarInput) -> Result<String, VarError> {
247        let v = self.definition.parse(input)?;
248        self.validate_constraints(v.as_ref())?;
249
250        Ok(v.format())
251    }
252
253    /// Parse the input and update the stored value to match.
254    pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
255        let v = self.definition.parse(input)?;
256
257        // Validate our parsed value.
258        self.validate_constraints(v.as_ref())?;
259
260        if local {
261            self.local_value = Some(v);
262        } else {
263            self.local_value = None;
264            self.staged_value = Some(v);
265        }
266        Ok(())
267    }
268
269    /// Sets the default value for the variable.
270    pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
271        let v = self.definition.parse(input)?;
272        self.validate_constraints(v.as_ref())?;
273        self.default_value = Some(v);
274        Ok(())
275    }
276
277    /// Reset the stored value to the default.
278    pub fn reset(&mut self, local: bool) {
279        let value = self
280            .default_value
281            .as_ref()
282            .map(|v| v.as_ref())
283            .unwrap_or_else(|| self.definition.value.value());
284        if local {
285            self.local_value = Some(value.box_clone());
286        } else {
287            self.local_value = None;
288            self.staged_value = Some(value.box_clone());
289        }
290    }
291
292    /// Returns a possibly new SessionVar if this needs to mutate at transaction end.
293    #[must_use]
294    pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
295        if !self.is_mutating() {
296            return None;
297        }
298        let mut next: Self = self.clone();
299        next.local_value = None;
300        match action {
301            EndTransactionAction::Commit if next.staged_value.is_some() => {
302                next.session_value = next.staged_value.take()
303            }
304            _ => next.staged_value = None,
305        }
306        Some(next)
307    }
308
309    /// Whether this Var needs to mutate at the end of a transaction.
310    pub fn is_mutating(&self) -> bool {
311        self.local_value.is_some() || self.staged_value.is_some()
312    }
313
314    pub fn value_dyn(&self) -> &dyn Value {
315        self.local_value
316            .as_deref()
317            .or(self.staged_value.as_deref())
318            .or(self.session_value.as_deref())
319            .or(self.default_value.as_deref())
320            .unwrap_or_else(|| self.definition.value.value())
321    }
322
323    /// Returns the [`Value`] that is currently stored as the `session_value`.
324    ///
325    /// Note: This should __only__ be used for inspection, if you want to determine the current
326    /// value of this [`SessionVar`] you should use [`SessionVar::value`].
327    pub fn inspect_session_value(&self) -> Option<&dyn Value> {
328        self.session_value.as_deref()
329    }
330
331    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
332        if let Some(constraint) = &self.definition.constraint {
333            constraint.check_constraint(self, self.value_dyn(), val)
334        } else {
335            Ok(())
336        }
337    }
338}
339
340impl Var for SessionVar {
341    fn name(&self) -> &'static str {
342        self.definition.name.as_str()
343    }
344
345    fn value(&self) -> String {
346        self.value_dyn().format()
347    }
348
349    fn description(&self) -> &'static str {
350        self.definition.description
351    }
352
353    fn type_name(&self) -> Cow<'static, str> {
354        self.definition.type_name()
355    }
356
357    fn visible(
358        &self,
359        user: &User,
360        system_vars: &super::vars::SystemVars,
361    ) -> Result<(), super::vars::VarError> {
362        self.definition.visible(user, system_vars)
363    }
364}
365
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub struct MzVersion {
368    /// Inputs to computed variables.
369    build_info: &'static BuildInfo,
370    /// Helm chart version
371    helm_chart_version: Option<String>,
372}
373
374impl MzVersion {
375    pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
376        MzVersion {
377            build_info,
378            helm_chart_version,
379        }
380    }
381}
382
383/// Session variables.
384///
385/// See the [`crate::session::vars`] module documentation for more details on the
386/// Materialize configuration model.
387#[derive(Debug, Clone)]
388pub struct SessionVars {
389    /// The set of all session variables.
390    vars: OrdMap<&'static UncasedStr, SessionVar>,
391    /// Inputs to computed variables.
392    mz_version: MzVersion,
393    /// Information about the user associated with this Session.
394    user: User,
395}
396
397impl SessionVars {
398    /// Creates a new [`SessionVars`] without considering the System or Role defaults.
399    pub fn new_unchecked(
400        build_info: &'static BuildInfo,
401        user: User,
402        helm_chart_version: Option<String>,
403    ) -> SessionVars {
404        use definitions::*;
405
406        let vars = [
407            &FAILPOINTS,
408            &SERVER_VERSION,
409            &SERVER_VERSION_NUM,
410            &SQL_SAFE_UPDATES,
411            &REAL_TIME_RECENCY,
412            &EMIT_PLAN_INSIGHTS_NOTICE,
413            &EMIT_TIMESTAMP_NOTICE,
414            &EMIT_TRACE_ID_NOTICE,
415            &AUTO_ROUTE_CATALOG_QUERIES,
416            &ENABLE_SESSION_RBAC_CHECKS,
417            &ENABLE_SESSION_CARDINALITY_ESTIMATES,
418            &MAX_IDENTIFIER_LENGTH,
419            &STATEMENT_LOGGING_SAMPLE_RATE,
420            &EMIT_INTROSPECTION_QUERY_NOTICE,
421            &UNSAFE_NEW_TRANSACTION_WALL_TIME,
422            &WELCOME_MESSAGE,
423        ]
424        .into_iter()
425        .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
426        .map(|var| (var.name, SessionVar::new(var.clone())))
427        .collect();
428
429        SessionVars {
430            vars,
431            mz_version: MzVersion::new(build_info, helm_chart_version),
432            user,
433        }
434    }
435
436    fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
437        let var = self
438            .vars
439            .get(var.name)
440            .expect("provided var should be in state");
441        let val = var.value_dyn();
442        val.as_any().downcast_ref::<V>().expect("success")
443    }
444
445    /// Returns an iterator over the configuration parameters and their current
446    /// values for this session.
447    ///
448    /// Note that this function does not check that the access variable should
449    /// be visible because of other settings or users. Before or after accessing
450    /// this method, you should call `Var::visible`.
451    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
452        #[allow(clippy::as_conversions)]
453        self.vars
454            .values()
455            .map(|v| v.as_var())
456            .chain([&self.mz_version as &dyn Var, &self.user])
457    }
458
459    /// Returns an iterator over configuration parameters (and their current
460    /// values for this session) that are expected to be sent to the client when
461    /// a new connection is established or when their value changes.
462    pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
463        // WARNING: variables in this set are not checked for visibility, and
464        // are assumed to be visible for all sessions.
465        //
466        // This is fixible with some elbow grease, but at the moment it seems
467        // unlikely that we'll have a variable in the notify set that shouldn't
468        // be visible to all sessions.
469        [
470            &APPLICATION_NAME,
471            &CLIENT_ENCODING,
472            &DATE_STYLE,
473            &INTEGER_DATETIMES,
474            &SERVER_VERSION,
475            &STANDARD_CONFORMING_STRINGS,
476            &TIMEZONE,
477            &INTERVAL_STYLE,
478            // Including `cluster`, `cluster_replica`, `database`, and `search_path` in the notify
479            // set is a Materialize extension. Doing so allows users to more easily identify where
480            // their queries will be executing, which is important to know when you consider the
481            // size of a cluster, what indexes are present, etc.
482            &CLUSTER,
483            &CLUSTER_REPLICA,
484            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
485            &DATABASE,
486            &SEARCH_PATH,
487        ]
488        .into_iter()
489        .map(|v| self.vars[v.name].as_var())
490        // Including `mz_version` in the notify set is a Materialize
491        // extension. Doing so allows applications to detect whether they
492        // are talking to Materialize or PostgreSQL without an additional
493        // network roundtrip. This is known to be safe because CockroachDB
494        // has an analogous extension [0].
495        // [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
496        .chain(std::iter::once(self.mz_version.as_var()))
497    }
498
499    /// Resets all variables to their default value.
500    pub fn reset_all(&mut self) {
501        let names: Vec<_> = self.vars.keys().copied().collect();
502        for name in names {
503            self.vars[name].reset(false);
504        }
505    }
506
507    /// Returns a [`Var`] representing the configuration parameter with the
508    /// specified name.
509    ///
510    /// Configuration parameters are matched case insensitively. If no such
511    /// configuration parameter exists, `get` returns an error.
512    ///
513    /// Note that if `name` is known at compile time, you should instead use the
514    /// named accessor to access the variable with its true Rust type. For
515    /// example, `self.get("sql_safe_updates").value()` returns the string
516    /// `"true"` or `"false"`, while `self.sql_safe_updates()` returns a bool.
517    pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
518        let name = compat_translate_name(name);
519
520        let name = UncasedStr::new(name);
521        if name == MZ_VERSION_NAME {
522            Ok(&self.mz_version)
523        } else if name == IS_SUPERUSER_NAME {
524            Ok(&self.user)
525        } else {
526            self.vars
527                .get(name)
528                .map(|v| {
529                    v.visible(&self.user, system_vars)?;
530                    Ok(v.as_var())
531                })
532                .transpose()?
533                .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
534        }
535    }
536
537    /// Returns a [`SessionVar`] for inspection.
538    ///
539    /// Note: If you're trying to determine the value of the variable with `name` you should
540    /// instead use the named accessor, or [`SessionVars::get`].
541    pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
542        let name = compat_translate_name(name);
543
544        self.vars
545            .get(UncasedStr::new(name))
546            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
547    }
548
549    /// Sets the configuration parameter named `name` to the value represented
550    /// by `value`.
551    ///
552    /// The new value may be either committed or rolled back by the next call to
553    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is always
554    /// discarded by the next call to [`SessionVars::end_transaction`], even if the
555    /// transaction is marked to commit.
556    ///
557    /// Like with [`SessionVars::get`], configuration parameters are matched case
558    /// insensitively. If `value` is not valid, as determined by the underlying
559    /// configuration parameter, or if the named configuration parameter does
560    /// not exist, an error is returned.
561    pub fn set(
562        &mut self,
563        system_vars: &SystemVars,
564        name: &str,
565        input: VarInput,
566        local: bool,
567    ) -> Result<(), VarError> {
568        let (name, input) = compat_translate(name, input);
569
570        let name = UncasedStr::new(name);
571        self.check_read_only(name)?;
572
573        self.vars
574            .get_mut(name)
575            .map(|v| {
576                v.visible(&self.user, system_vars)?;
577                v.set(input, local)
578            })
579            .transpose()?
580            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
581    }
582
583    /// Sets the default value for the parameter named `name` to the value
584    /// represented by `value`.
585    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
586        let (name, input) = compat_translate(name, input);
587
588        let name = UncasedStr::new(name);
589        self.check_read_only(name)?;
590
591        self.vars
592            .get_mut(name)
593            // Note: visibility is checked when persisting a role default.
594            .map(|v| v.set_default(input))
595            .transpose()?
596            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
597    }
598
599    /// Sets the configuration parameter named `name` to its default value.
600    ///
601    /// The new value may be either committed or rolled back by the next call to
602    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is
603    /// always discarded by the next call to [`SessionVars::end_transaction`],
604    /// even if the transaction is marked to commit.
605    ///
606    /// Like with [`SessionVars::get`], configuration parameters are matched
607    /// case insensitively. If the named configuration parameter does not exist,
608    /// an error is returned.
609    ///
610    /// If the variable does not exist or the user does not have the visibility
611    /// requires, this function returns an error.
612    pub fn reset(
613        &mut self,
614        system_vars: &SystemVars,
615        name: &str,
616        local: bool,
617    ) -> Result<(), VarError> {
618        let name = compat_translate_name(name);
619
620        let name = UncasedStr::new(name);
621        self.check_read_only(name)?;
622
623        self.vars
624            .get_mut(name)
625            .map(|v| {
626                v.visible(&self.user, system_vars)?;
627                v.reset(local);
628                Ok(())
629            })
630            .transpose()?
631            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
632    }
633
634    /// Returns an error if the variable corresponding to `name` is read only.
635    fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
636        if name == MZ_VERSION_NAME {
637            Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
638        } else if name == IS_SUPERUSER_NAME {
639            Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
640        } else if name == MAX_IDENTIFIER_LENGTH.name {
641            Err(VarError::ReadOnlyParameter(
642                MAX_IDENTIFIER_LENGTH.name.as_str(),
643            ))
644        } else {
645            Ok(())
646        }
647    }
648
649    /// Commits or rolls back configuration parameter updates made via
650    /// [`SessionVars::set`] since the last call to `end_transaction`.
651    ///
652    /// Returns any session parameters that changed because the transaction ended.
653    #[mz_ore::instrument(level = "debug")]
654    pub fn end_transaction(
655        &mut self,
656        action: EndTransactionAction,
657    ) -> BTreeMap<&'static str, String> {
658        let mut changed = BTreeMap::new();
659        let mut updates = Vec::new();
660        for (name, var) in self.vars.iter() {
661            if !var.is_mutating() {
662                continue;
663            }
664            let before = var.value();
665            let next = var.end_transaction(action).expect("must mutate");
666            let after = next.value();
667            updates.push((*name, next));
668
669            // Report the new value of the parameter.
670            if before != after {
671                changed.insert(var.name(), after);
672            }
673        }
674        self.vars.extend(updates);
675        changed
676    }
677
678    /// Returns the value of the `application_name` configuration parameter.
679    pub fn application_name(&self) -> &str {
680        self.expect_value::<String>(&APPLICATION_NAME).as_str()
681    }
682
683    /// Returns the build info.
684    pub fn build_info(&self) -> &'static BuildInfo {
685        self.mz_version.build_info
686    }
687
688    /// Returns the value of the `client_encoding` configuration parameter.
689    pub fn client_encoding(&self) -> &ClientEncoding {
690        self.expect_value(&CLIENT_ENCODING)
691    }
692
693    /// Returns the value of the `client_min_messages` configuration parameter.
694    pub fn client_min_messages(&self) -> &ClientSeverity {
695        self.expect_value(&CLIENT_MIN_MESSAGES)
696    }
697
698    /// Returns the value of the `cluster` configuration parameter.
699    pub fn cluster(&self) -> &str {
700        self.expect_value::<String>(&CLUSTER).as_str()
701    }
702
703    /// Returns the value of the `cluster_replica` configuration parameter.
704    pub fn cluster_replica(&self) -> Option<&str> {
705        self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
706            .as_deref()
707    }
708
709    /// Returns the value of the `current_object_missing_warnings` configuration
710    /// parameter.
711    pub fn current_object_missing_warnings(&self) -> bool {
712        *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
713    }
714
715    /// Returns the value of the `DateStyle` configuration parameter.
716    pub fn date_style(&self) -> &[&str] {
717        &self.expect_value::<DateStyle>(&DATE_STYLE).0
718    }
719
720    /// Returns the value of the `database` configuration parameter.
721    pub fn database(&self) -> &str {
722        self.expect_value::<String>(&DATABASE).as_str()
723    }
724
725    /// Returns the value of the `extra_float_digits` configuration parameter.
726    pub fn extra_float_digits(&self) -> i32 {
727        *self.expect_value(&EXTRA_FLOAT_DIGITS)
728    }
729
730    /// Returns the value of the `integer_datetimes` configuration parameter.
731    pub fn integer_datetimes(&self) -> bool {
732        *self.expect_value(&INTEGER_DATETIMES)
733    }
734
735    /// Returns the value of the `intervalstyle` configuration parameter.
736    pub fn intervalstyle(&self) -> &IntervalStyle {
737        self.expect_value(&INTERVAL_STYLE)
738    }
739
740    /// Returns the value of the `mz_version` configuration parameter.
741    pub fn mz_version(&self) -> String {
742        self.mz_version.value()
743    }
744
745    /// Returns the value of the `search_path` configuration parameter.
746    pub fn search_path(&self) -> &[Ident] {
747        self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
748    }
749
750    /// Returns the value of the `server_version` configuration parameter.
751    pub fn server_version(&self) -> &str {
752        self.expect_value::<String>(&SERVER_VERSION).as_str()
753    }
754
755    /// Returns the value of the `server_version_num` configuration parameter.
756    pub fn server_version_num(&self) -> i32 {
757        *self.expect_value(&SERVER_VERSION_NUM)
758    }
759
760    /// Returns the value of the `sql_safe_updates` configuration parameter.
761    pub fn sql_safe_updates(&self) -> bool {
762        *self.expect_value(&SQL_SAFE_UPDATES)
763    }
764
765    /// Returns the value of the `standard_conforming_strings` configuration
766    /// parameter.
767    pub fn standard_conforming_strings(&self) -> bool {
768        *self.expect_value(&STANDARD_CONFORMING_STRINGS)
769    }
770
771    /// Returns the value of the `statement_timeout` configuration parameter.
772    pub fn statement_timeout(&self) -> &Duration {
773        self.expect_value(&STATEMENT_TIMEOUT)
774    }
775
776    /// Returns the value of the `idle_in_transaction_session_timeout` configuration parameter.
777    pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
778        self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
779    }
780
781    /// Returns the value of the `timezone` configuration parameter.
782    pub fn timezone(&self) -> &TimeZone {
783        self.expect_value(&TIMEZONE)
784    }
785
786    /// Returns the value of the `transaction_isolation` configuration
787    /// parameter.
788    pub fn transaction_isolation(&self) -> &IsolationLevel {
789        self.expect_value(&TRANSACTION_ISOLATION)
790    }
791
792    /// Returns the value of `real_time_recency` configuration parameter.
793    pub fn real_time_recency(&self) -> bool {
794        *self.expect_value(&REAL_TIME_RECENCY)
795    }
796
797    /// Returns the value of the `real_time_recency_timeout` configuration parameter.
798    pub fn real_time_recency_timeout(&self) -> &Duration {
799        self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
800    }
801
802    /// Returns the value of `emit_plan_insights_notice` configuration parameter.
803    pub fn emit_plan_insights_notice(&self) -> bool {
804        *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
805    }
806
807    /// Returns the value of `emit_timestamp_notice` configuration parameter.
808    pub fn emit_timestamp_notice(&self) -> bool {
809        *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
810    }
811
812    /// Returns the value of `emit_trace_id_notice` configuration parameter.
813    pub fn emit_trace_id_notice(&self) -> bool {
814        *self.expect_value(&EMIT_TRACE_ID_NOTICE)
815    }
816
817    /// Returns the value of `auto_route_catalog_queries` configuration parameter.
818    pub fn auto_route_catalog_queries(&self) -> bool {
819        *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
820    }
821
822    /// Returns the value of `enable_session_rbac_checks` configuration parameter.
823    pub fn enable_session_rbac_checks(&self) -> bool {
824        *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
825    }
826
827    /// Returns the value of `enable_session_cardinality_estimates` configuration parameter.
828    pub fn enable_session_cardinality_estimates(&self) -> bool {
829        *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
830    }
831
832    /// Returns the value of `is_superuser` configuration parameter.
833    pub fn is_superuser(&self) -> bool {
834        self.user.is_superuser()
835    }
836
837    /// Returns the user associated with this `SessionVars` instance.
838    pub fn user(&self) -> &User {
839        &self.user
840    }
841
842    /// Returns the value of the `max_query_result_size` configuration parameter.
843    pub fn max_query_result_size(&self) -> u64 {
844        self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
845            .as_bytes()
846    }
847
848    /// Sets the internal metadata associated with the user.
849    pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
850        self.user.internal_metadata = Some(metadata);
851    }
852
853    /// Sets the external metadata associated with the user.
854    pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
855        self.user.external_metadata = Some(metadata);
856    }
857
858    pub fn set_cluster(&mut self, cluster: String) {
859        let var = self
860            .vars
861            .get_mut(UncasedStr::new(CLUSTER.name()))
862            .expect("cluster variable must exist");
863        var.set(VarInput::Flat(&cluster), false)
864            .expect("setting cluster must succeed");
865    }
866
867    pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
868        let var = self
869            .vars
870            .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
871            .expect("transaction_isolation variable must exist");
872        var.set(VarInput::Flat(transaction_isolation.as_str()), true)
873            .expect("setting transaction isolation must succeed");
874    }
875
876    pub fn get_statement_logging_sample_rate(&self) -> Numeric {
877        *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
878    }
879
880    /// Returns the value of the `emit_introspection_query_notice` configuration parameter.
881    pub fn emit_introspection_query_notice(&self) -> bool {
882        *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
883    }
884
885    pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
886        *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
887    }
888
889    /// Returns the value of the `welcome_message` configuration parameter.
890    pub fn welcome_message(&self) -> bool {
891        *self.expect_value(&WELCOME_MESSAGE)
892    }
893}
894
895// TODO(database-issues#8069) remove together with `compat_translate`
896pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
897pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
898
899/// If the given variable name and/or input is deprecated, return a corresponding updated value,
900/// otherwise return the original.
901///
902/// This method was introduced to gracefully handle the rename of the `mz_introspection` cluster to
903/// `mz_cluster_server`. The plan is to remove it once all users have migrated to the new name. The
904/// debug logs will be helpful for checking this in production.
905// TODO(database-issues#8069) remove this after sufficient time has passed
906fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
907    if name == CLUSTER.name() {
908        if let Ok(value) = CLUSTER.parse(input) {
909            if value.format() == OLD_CATALOG_SERVER_CLUSTER {
910                tracing::debug!(
911                    github_27285 = true,
912                    "encountered deprecated `cluster` variable value: {}",
913                    OLD_CATALOG_SERVER_CLUSTER,
914                );
915                return (name, VarInput::Flat("mz_catalog_server"));
916            }
917        }
918    }
919
920    if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
921        tracing::debug!(
922            github_27285 = true,
923            "encountered deprecated `{}` variable name",
924            OLD_AUTO_ROUTE_CATALOG_QUERIES,
925        );
926        return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
927    }
928
929    (name, input)
930}
931
932fn compat_translate_name(name: &str) -> &str {
933    let (name, _) = compat_translate(name, VarInput::Flat(""));
934    name
935}
936
937/// A `SystemVar` is persisted on disk value for a configuration parameter. If unset,
938/// the server default is used instead.
939#[derive(Debug)]
940pub struct SystemVar {
941    definition: VarDefinition,
942    /// Value currently persisted to disk.
943    persisted_value: Option<Box<dyn Value>>,
944    /// Current default, not persisted to disk.
945    dynamic_default: Option<Box<dyn Value>>,
946}
947
948impl Clone for SystemVar {
949    fn clone(&self) -> Self {
950        SystemVar {
951            definition: self.definition.clone(),
952            persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
953            dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
954        }
955    }
956}
957
958impl SystemVar {
959    pub fn new(definition: VarDefinition) -> Self {
960        SystemVar {
961            definition,
962            persisted_value: None,
963            dynamic_default: None,
964        }
965    }
966
967    fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
968        let v = self.definition.parse(input)?;
969        Ok(self.definition.default_value() == v.as_ref())
970    }
971
972    pub fn value_dyn(&self) -> &dyn Value {
973        self.persisted_value
974            .as_deref()
975            .or(self.dynamic_default.as_deref())
976            .unwrap_or_else(|| self.definition.default_value())
977    }
978
979    pub fn value<V: 'static>(&self) -> &V {
980        let val = self.value_dyn();
981        val.as_any().downcast_ref::<V>().expect("success")
982    }
983
984    fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
985        let v = self.definition.parse(input)?;
986        // Validate our parsed value.
987        self.validate_constraints(v.as_ref())?;
988        Ok(v)
989    }
990
991    fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
992        let v = self.parse(input)?;
993
994        if self.persisted_value.as_ref() != Some(&v) {
995            self.persisted_value = Some(v);
996            Ok(true)
997        } else {
998            Ok(false)
999        }
1000    }
1001
1002    fn reset(&mut self) -> bool {
1003        if self.persisted_value.is_some() {
1004            self.persisted_value = None;
1005            true
1006        } else {
1007            false
1008        }
1009    }
1010
1011    fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1012        let v = self.definition.parse(input)?;
1013        self.dynamic_default = Some(v);
1014        Ok(())
1015    }
1016
1017    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1018        if let Some(constraint) = &self.definition.constraint {
1019            constraint.check_constraint(self, self.value_dyn(), val)
1020        } else {
1021            Ok(())
1022        }
1023    }
1024}
1025
1026impl Var for SystemVar {
1027    fn name(&self) -> &'static str {
1028        self.definition.name.as_str()
1029    }
1030
1031    fn value(&self) -> String {
1032        self.value_dyn().format()
1033    }
1034
1035    fn description(&self) -> &'static str {
1036        self.definition.description
1037    }
1038
1039    fn type_name(&self) -> Cow<'static, str> {
1040        self.definition.type_name()
1041    }
1042
1043    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1044        self.definition.visible(user, system_vars)
1045    }
1046}
1047
1048#[derive(Debug, Error)]
1049pub enum NetworkPolicyError {
1050    #[error("Access denied for address {0}")]
1051    AddressDenied(IpAddr),
1052}
1053
1054/// On disk variables.
1055///
1056/// See the [`crate::session::vars`] module documentation for more details on the
1057/// Materialize configuration model.
1058#[derive(Derivative, Clone)]
1059#[derivative(Debug)]
1060pub struct SystemVars {
1061    /// Allows "unsafe" parameters to be set.
1062    allow_unsafe: bool,
1063    /// Set of all [`SystemVar`]s.
1064    vars: BTreeMap<&'static UncasedStr, SystemVar>,
1065    /// External components interested in when a [`SystemVar`] gets updated.
1066    #[derivative(Debug = "ignore")]
1067    callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1068
1069    /// NB: This is intentionally disconnected from the one that is plumbed around to persist and
1070    /// the controllers. This is so we can explicitly control and reason about when changes to config
1071    /// values are propagated to the rest of the system.
1072    dyncfgs: ConfigSet,
1073}
1074
1075impl Default for SystemVars {
1076    fn default() -> Self {
1077        Self::new()
1078    }
1079}
1080
1081impl SystemVars {
1082    pub fn new() -> Self {
1083        let system_vars = vec![
1084            &MAX_KAFKA_CONNECTIONS,
1085            &MAX_POSTGRES_CONNECTIONS,
1086            &MAX_MYSQL_CONNECTIONS,
1087            &MAX_SQL_SERVER_CONNECTIONS,
1088            &MAX_AWS_PRIVATELINK_CONNECTIONS,
1089            &MAX_TABLES,
1090            &MAX_SOURCES,
1091            &MAX_SINKS,
1092            &MAX_MATERIALIZED_VIEWS,
1093            &MAX_CLUSTERS,
1094            &MAX_REPLICAS_PER_CLUSTER,
1095            &MAX_CREDIT_CONSUMPTION_RATE,
1096            &MAX_DATABASES,
1097            &MAX_SCHEMAS_PER_DATABASE,
1098            &MAX_OBJECTS_PER_SCHEMA,
1099            &MAX_SECRETS,
1100            &MAX_ROLES,
1101            &MAX_CONTINUAL_TASKS,
1102            &MAX_NETWORK_POLICIES,
1103            &MAX_RULES_PER_NETWORK_POLICY,
1104            &MAX_RESULT_SIZE,
1105            &MAX_COPY_FROM_ROW_SIZE,
1106            &ALLOWED_CLUSTER_REPLICA_SIZES,
1107            &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1108            &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1109            &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1110            &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1111            &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1112            &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1113            &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1114            &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1115            &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1116            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1117            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1118            &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1119            &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1120            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1121            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1122            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1123            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1124            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1125            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1126            &STORAGE_STATISTICS_INTERVAL,
1127            &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1128            &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1129            &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1130            &PERSIST_FAST_PATH_LIMIT,
1131            &METRICS_RETENTION,
1132            &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1133            &ENABLE_RBAC_CHECKS,
1134            &PG_SOURCE_CONNECT_TIMEOUT,
1135            &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1136            &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1137            &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1138            &PG_SOURCE_TCP_USER_TIMEOUT,
1139            &PG_SOURCE_TCP_CONFIGURE_SERVER,
1140            &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1141            &PG_SOURCE_WAL_SENDER_TIMEOUT,
1142            &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1143            &MYSQL_SOURCE_TCP_KEEPALIVE,
1144            &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1145            &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1146            &MYSQL_SOURCE_CONNECT_TIMEOUT,
1147            &SSH_CHECK_INTERVAL,
1148            &SSH_CONNECT_TIMEOUT,
1149            &SSH_KEEPALIVES_IDLE,
1150            &KAFKA_SOCKET_KEEPALIVE,
1151            &KAFKA_SOCKET_TIMEOUT,
1152            &KAFKA_TRANSACTION_TIMEOUT,
1153            &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1154            &KAFKA_FETCH_METADATA_TIMEOUT,
1155            &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1156            &ENABLE_LAUNCHDARKLY,
1157            &MAX_CONNECTIONS,
1158            &NETWORK_POLICY,
1159            &SUPERUSER_RESERVED_CONNECTIONS,
1160            &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1161            &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1162            &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1163            &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1164            &ENABLE_STORAGE_SHARD_FINALIZATION,
1165            &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1166            &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1167            &DEFAULT_TIMESTAMP_INTERVAL,
1168            &MIN_TIMESTAMP_INTERVAL,
1169            &MAX_TIMESTAMP_INTERVAL,
1170            &LOGGING_FILTER,
1171            &OPENTELEMETRY_FILTER,
1172            &LOGGING_FILTER_DEFAULTS,
1173            &OPENTELEMETRY_FILTER_DEFAULTS,
1174            &SENTRY_FILTERS,
1175            &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1176            &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1177            &grpc_client::CONNECT_TIMEOUT,
1178            &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1179            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1180            &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1181            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1182            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1183            &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1184            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1185            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1186            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1187            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1188            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1189            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1190            &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1191            &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1192            &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1193            &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1194            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1195            &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1196            &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1197            &STATEMENT_LOGGING_TARGET_DATA_RATE,
1198            &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1199            &ENABLE_INTERNAL_STATEMENT_LOGGING,
1200            &OPTIMIZER_STATS_TIMEOUT,
1201            &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1202            &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1203            &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1204            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1205            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1206            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1207            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1208            &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1209            &FORCE_SOURCE_TABLE_SYNTAX,
1210            &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1211            &SCRAM_ITERATIONS,
1212        ];
1213
1214        let dyncfgs = mz_dyncfgs::all_dyncfgs();
1215        let dyncfg_vars: Vec<_> = dyncfgs
1216            .entries()
1217            .map(|cfg| match cfg.default() {
1218                ConfigVal::Bool(default) => {
1219                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1220                }
1221                ConfigVal::U32(default) => {
1222                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1223                }
1224                ConfigVal::Usize(default) => {
1225                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1226                }
1227                ConfigVal::OptUsize(default) => {
1228                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1229                }
1230                ConfigVal::F64(default) => {
1231                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1232                }
1233                ConfigVal::String(default) => {
1234                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1235                }
1236                ConfigVal::OptString(default) => {
1237                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1238                }
1239                ConfigVal::Duration(default) => {
1240                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1241                }
1242                ConfigVal::Json(default) => {
1243                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1244                }
1245            })
1246            .collect();
1247
1248        let vars: BTreeMap<_, _> = system_vars
1249            .into_iter()
1250            // Include all of our feature flags.
1251            .chain(definitions::FEATURE_FLAGS.iter().copied())
1252            // Include the subset of Session variables we allow system defaults for.
1253            .chain(SESSION_SYSTEM_VARS.values().copied())
1254            .cloned()
1255            // Include Persist configs.
1256            .chain(dyncfg_vars)
1257            .map(|var| (var.name, SystemVar::new(var)))
1258            .collect();
1259
1260        let vars = SystemVars {
1261            vars,
1262            callbacks: BTreeMap::new(),
1263            allow_unsafe: false,
1264            dyncfgs,
1265        };
1266
1267        vars
1268    }
1269
1270    pub fn dyncfgs(&self) -> &ConfigSet {
1271        &self.dyncfgs
1272    }
1273
1274    pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1275        self.allow_unsafe = allow_unsafe;
1276        self
1277    }
1278
1279    pub fn allow_unsafe(&self) -> bool {
1280        self.allow_unsafe
1281    }
1282
1283    fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1284        let val = self
1285            .vars
1286            .get(var.name)
1287            .expect("provided var should be in state");
1288
1289        val.value_dyn()
1290            .as_any()
1291            .downcast_ref::<V>()
1292            .expect("provided var type should matched stored var")
1293    }
1294
1295    fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1296        let val = self
1297            .vars
1298            .get(name)
1299            .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1300
1301        val.value_dyn()
1302            .as_any()
1303            .downcast_ref()
1304            .expect("provided var type should matched stored var")
1305    }
1306
1307    /// Reset all the values to their defaults (preserving
1308    /// defaults from `VarMut::set_default).
1309    pub fn reset_all(&mut self) {
1310        for (_, var) in &mut self.vars {
1311            var.reset();
1312        }
1313    }
1314
1315    /// Returns an iterator over the configuration parameters and their current
1316    /// values on disk.
1317    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1318        self.vars
1319            .values()
1320            .map(|v| v.as_var())
1321            .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1322    }
1323
1324    /// Returns an iterator over the configuration parameters and their current
1325    /// values on disk. Compared to [`SystemVars::iter`], this should omit vars
1326    /// that shouldn't be synced by SystemParameterFrontend.
1327    pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1328        self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1329    }
1330
1331    /// Returns an iterator over the configuration parameters that can be overriden per-Session.
1332    pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1333        self.vars
1334            .values()
1335            .map(|v| v.as_var())
1336            .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1337    }
1338
1339    /// Returns whether or not this parameter can be modified by a superuser.
1340    pub fn user_modifiable(&self, name: &str) -> bool {
1341        SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1342            || name == ENABLE_RBAC_CHECKS.name()
1343            || name == NETWORK_POLICY.name()
1344    }
1345
1346    /// Returns a [`Var`] representing the configuration parameter with the
1347    /// specified name.
1348    ///
1349    /// Configuration parameters are matched case insensitively. If no such
1350    /// configuration parameter exists, `get` returns an error.
1351    ///
1352    /// Note that:
1353    /// - If `name` is known at compile time, you should instead use the named
1354    /// accessor to access the variable with its true Rust type. For example,
1355    /// `self.get("max_tables").value()` returns the string `"25"` or the
1356    /// current value, while `self.max_tables()` returns an i32.
1357    ///
1358    /// - This function does not check that the access variable should be
1359    /// visible because of other settings or users. Before or after accessing
1360    /// this method, you should call `Var::visible`.
1361    ///
1362    /// # Errors
1363    ///
1364    /// The call will return an error:
1365    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1366    pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1367        self.vars
1368            .get(UncasedStr::new(name))
1369            .map(|v| v.as_var())
1370            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1371    }
1372
1373    /// Check if the given `values` is the default value for the [`Var`]
1374    /// identified by `name`.
1375    ///
1376    /// Note that this function does not check that the access variable should
1377    /// be visible because of other settings or users. Before or after accessing
1378    /// this method, you should call `Var::visible`.
1379    ///
1380    /// # Errors
1381    ///
1382    /// The call will return an error:
1383    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1384    /// 2. If `values` does not represent a valid [`SystemVars`] value for
1385    ///    `name`.
1386    pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1387        self.vars
1388            .get(UncasedStr::new(name))
1389            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1390            .and_then(|v| v.is_default(input))
1391    }
1392
1393    /// Sets the configuration parameter named `name` to the value represented
1394    /// by `input`.
1395    ///
1396    /// Like with [`SystemVars::get`], configuration parameters are matched case
1397    /// insensitively. If `input` is not valid, as determined by the underlying
1398    /// configuration parameter, or if the named configuration parameter does
1399    /// not exist, an error is returned.
1400    ///
1401    /// Return a `bool` value indicating whether the [`Var`] identified by
1402    /// `name` was modified by this call (it won't be if it already had the
1403    /// given `input`).
1404    ///
1405    /// Note that this function does not check that the access variable should
1406    /// be visible because of other settings or users. Before or after accessing
1407    /// this method, you should call `Var::visible`.
1408    ///
1409    /// # Errors
1410    ///
1411    /// The call will return an error:
1412    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1413    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1414    ///    `name`.
1415    pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1416        let result = self
1417            .vars
1418            .get_mut(UncasedStr::new(name))
1419            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1420            .and_then(|v| v.set(input))?;
1421        self.notify_callbacks(name);
1422        Ok(result)
1423    }
1424
1425    /// Parses the configuration parameter value represented by `input` named
1426    /// `name`.
1427    ///
1428    /// Like with [`SystemVars::get`], configuration parameters are matched case
1429    /// insensitively. If `input` is not valid, as determined by the underlying
1430    /// configuration parameter, or if the named configuration parameter does
1431    /// not exist, an error is returned.
1432    ///
1433    /// Return a `Box<dyn Value>` that is the result of parsing `input`.
1434    ///
1435    /// Note that this function does not check that the access variable should
1436    /// be visible because of other settings or users. Before or after accessing
1437    /// this method, you should call `Var::visible`.
1438    ///
1439    /// # Errors
1440    ///
1441    /// The call will return an error:
1442    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1443    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1444    ///    `name`.
1445    pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1446        self.vars
1447            .get(UncasedStr::new(name))
1448            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1449            .and_then(|v| v.parse(input))
1450    }
1451
1452    /// Set the default for this variable. This is the value this
1453    /// variable will be be `reset` to. If no default is set, the static default in the
1454    /// variable definition is used instead.
1455    ///
1456    /// Note that this function does not check that the access variable should
1457    /// be visible because of other settings or users. Before or after accessing
1458    /// this method, you should call `Var::visible`.
1459    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1460        let result = self
1461            .vars
1462            .get_mut(UncasedStr::new(name))
1463            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1464            .and_then(|v| v.set_default(input))?;
1465        self.notify_callbacks(name);
1466        Ok(result)
1467    }
1468
1469    /// Sets the configuration parameter named `name` to its default value.
1470    ///
1471    /// Like with [`SystemVars::get`], configuration parameters are matched case
1472    /// insensitively. If the named configuration parameter does not exist, an
1473    /// error is returned.
1474    ///
1475    /// Return a `bool` value indicating whether the [`Var`] identified by
1476    /// `name` was modified by this call (it won't be if was already reset).
1477    ///
1478    /// Note that this function does not check that the access variable should
1479    /// be visible because of other settings or users. Before or after accessing
1480    /// this method, you should call `Var::visible`.
1481    ///
1482    /// # Errors
1483    ///
1484    /// The call will return an error:
1485    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1486    pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1487        let result = self
1488            .vars
1489            .get_mut(UncasedStr::new(name))
1490            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1491            .map(|v| v.reset())?;
1492        self.notify_callbacks(name);
1493        Ok(result)
1494    }
1495
1496    /// Returns a map from each system parameter's name to its default value.
1497    pub fn defaults(&self) -> BTreeMap<String, String> {
1498        self.vars
1499            .iter()
1500            .map(|(name, var)| {
1501                let default = var
1502                    .dynamic_default
1503                    .as_deref()
1504                    .unwrap_or_else(|| var.definition.default_value());
1505                (name.as_str().to_owned(), default.format())
1506            })
1507            .collect()
1508    }
1509
1510    /// Registers a closure that will get called when the value for the
1511    /// specified [`VarDefinition`] changes.
1512    ///
1513    /// The callback is guaranteed to be called at least once.
1514    pub fn register_callback(
1515        &mut self,
1516        var: &VarDefinition,
1517        callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1518    ) {
1519        self.callbacks
1520            .entry(var.name().to_string())
1521            .or_default()
1522            .push(callback);
1523        self.notify_callbacks(var.name());
1524    }
1525
1526    /// Notify any external components interested in this variable.
1527    fn notify_callbacks(&self, name: &str) {
1528        // Get the callbacks interested in this variable.
1529        if let Some(callbacks) = self.callbacks.get(name) {
1530            for callback in callbacks {
1531                (callback)(self);
1532            }
1533        }
1534    }
1535
1536    /// Returns the system default for the [`CLUSTER`] session variable. To know the active cluster
1537    /// for the current session, you must check the [`SessionVars`].
1538    pub fn default_cluster(&self) -> String {
1539        self.expect_value::<String>(&CLUSTER).to_owned()
1540    }
1541
1542    /// Returns the value of the `max_kafka_connections` configuration parameter.
1543    pub fn max_kafka_connections(&self) -> u32 {
1544        *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1545    }
1546
1547    /// Returns the value of the `max_postgres_connections` configuration parameter.
1548    pub fn max_postgres_connections(&self) -> u32 {
1549        *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1550    }
1551
1552    /// Returns the value of the `max_mysql_connections` configuration parameter.
1553    pub fn max_mysql_connections(&self) -> u32 {
1554        *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1555    }
1556
1557    /// Returns the value of the `max_sql_server_connections` configuration parameter.
1558    pub fn max_sql_server_connections(&self) -> u32 {
1559        *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1560    }
1561
1562    /// Returns the value of the `max_aws_privatelink_connections` configuration parameter.
1563    pub fn max_aws_privatelink_connections(&self) -> u32 {
1564        *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1565    }
1566
1567    /// Returns the value of the `max_tables` configuration parameter.
1568    pub fn max_tables(&self) -> u32 {
1569        *self.expect_value(&MAX_TABLES)
1570    }
1571
1572    /// Returns the value of the `max_sources` configuration parameter.
1573    pub fn max_sources(&self) -> u32 {
1574        *self.expect_value(&MAX_SOURCES)
1575    }
1576
1577    /// Returns the value of the `max_sinks` configuration parameter.
1578    pub fn max_sinks(&self) -> u32 {
1579        *self.expect_value(&MAX_SINKS)
1580    }
1581
1582    /// Returns the value of the `max_materialized_views` configuration parameter.
1583    pub fn max_materialized_views(&self) -> u32 {
1584        *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1585    }
1586
1587    /// Returns the value of the `max_clusters` configuration parameter.
1588    pub fn max_clusters(&self) -> u32 {
1589        *self.expect_value(&MAX_CLUSTERS)
1590    }
1591
1592    /// Returns the value of the `max_replicas_per_cluster` configuration parameter.
1593    pub fn max_replicas_per_cluster(&self) -> u32 {
1594        *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1595    }
1596
1597    /// Returns the value of the `max_credit_consumption_rate` configuration parameter.
1598    pub fn max_credit_consumption_rate(&self) -> Numeric {
1599        *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1600    }
1601
1602    /// Returns the value of the `max_databases` configuration parameter.
1603    pub fn max_databases(&self) -> u32 {
1604        *self.expect_value(&MAX_DATABASES)
1605    }
1606
1607    /// Returns the value of the `max_schemas_per_database` configuration parameter.
1608    pub fn max_schemas_per_database(&self) -> u32 {
1609        *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1610    }
1611
1612    /// Returns the value of the `max_objects_per_schema` configuration parameter.
1613    pub fn max_objects_per_schema(&self) -> u32 {
1614        *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1615    }
1616
1617    /// Returns the value of the `max_secrets` configuration parameter.
1618    pub fn max_secrets(&self) -> u32 {
1619        *self.expect_value(&MAX_SECRETS)
1620    }
1621
1622    /// Returns the value of the `max_roles` configuration parameter.
1623    pub fn max_roles(&self) -> u32 {
1624        *self.expect_value(&MAX_ROLES)
1625    }
1626
1627    /// Returns the value of the `max_continual_tasks` configuration parameter.
1628    pub fn max_continual_tasks(&self) -> u32 {
1629        *self.expect_value(&MAX_CONTINUAL_TASKS)
1630    }
1631
1632    /// Returns the value of the `max_network_policies` configuration parameter.
1633    pub fn max_network_policies(&self) -> u32 {
1634        *self.expect_value(&MAX_NETWORK_POLICIES)
1635    }
1636
1637    /// Returns the value of the `max_network_policies` configuration parameter.
1638    pub fn max_rules_per_network_policy(&self) -> u32 {
1639        *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1640    }
1641
1642    /// Returns the value of the `max_result_size` configuration parameter.
1643    pub fn max_result_size(&self) -> u64 {
1644        self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1645    }
1646
1647    /// Returns the value of the `max_copy_from_row_size` configuration parameter.
1648    pub fn max_copy_from_row_size(&self) -> u64 {
1649        self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1650            .as_bytes()
1651    }
1652
1653    /// Returns the value of the `allowed_cluster_replica_sizes` configuration parameter.
1654    pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1655        self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1656            .into_iter()
1657            .map(|s| s.as_str().into())
1658            .collect()
1659    }
1660
1661    /// Returns the value of the `default_cluster_replication_factor` configuration parameter.
1662    pub fn default_cluster_replication_factor(&self) -> u32 {
1663        *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1664    }
1665
1666    pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1667        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1668    }
1669
1670    pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1671        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1672    }
1673
1674    pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1675        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1676    }
1677
1678    pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1679        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1680    }
1681
1682    pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1683        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1684    }
1685
1686    pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1687        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1688    }
1689
1690    pub fn upsert_rocksdb_bottommost_compression_type(
1691        &self,
1692    ) -> mz_rocksdb_types::config::CompressionType {
1693        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1694    }
1695
1696    pub fn upsert_rocksdb_batch_size(&self) -> usize {
1697        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1698    }
1699
1700    pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1701        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1702    }
1703
1704    pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1705        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1706    }
1707
1708    pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1709        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1710    }
1711
1712    pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1713        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1714    }
1715
1716    pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1717        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1718    }
1719
1720    pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1721        *self.expect_value(
1722            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1723        )
1724    }
1725
1726    pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1727        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1728    }
1729
1730    pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1731        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1732    }
1733
1734    pub fn persist_fast_path_limit(&self) -> usize {
1735        *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1736    }
1737
1738    /// Returns the `pg_source_connect_timeout` configuration parameter.
1739    pub fn pg_source_connect_timeout(&self) -> Duration {
1740        *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1741    }
1742
1743    /// Returns the `pg_source_tcp_keepalives_retries` configuration parameter.
1744    pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1745        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1746    }
1747
1748    /// Returns the `pg_source_tcp_keepalives_idle` configuration parameter.
1749    pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1750        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1751    }
1752
1753    /// Returns the `pg_source_tcp_keepalives_interval` configuration parameter.
1754    pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1755        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1756    }
1757
1758    /// Returns the `pg_source_tcp_user_timeout` configuration parameter.
1759    pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1760        *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1761    }
1762
1763    /// Returns the `pg_source_tcp_configure_server` configuration parameter.
1764    pub fn pg_source_tcp_configure_server(&self) -> bool {
1765        *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1766    }
1767
1768    /// Returns the `pg_source_snapshot_statement_timeout` configuration parameter.
1769    pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1770        *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1771    }
1772
1773    /// Returns the `pg_source_wal_sender_timeout` configuration parameter.
1774    pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1775        *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1776    }
1777
1778    /// Returns the `pg_source_snapshot_collect_strict_count` configuration parameter.
1779    pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1780        *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1781    }
1782
1783    /// Returns the `mysql_source_tcp_keepalive` configuration parameter.
1784    pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1785        *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1786    }
1787
1788    /// Returns the `mysql_source_snapshot_max_execution_time` configuration parameter.
1789    pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1790        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1791    }
1792
1793    /// Returns the `mysql_source_snapshot_lock_wait_timeout` configuration parameter.
1794    pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1795        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1796    }
1797
1798    /// Returns the `mysql_source_connect_timeout` configuration parameter.
1799    pub fn mysql_source_connect_timeout(&self) -> Duration {
1800        *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1801    }
1802
1803    /// Returns the `ssh_check_interval` configuration parameter.
1804    pub fn ssh_check_interval(&self) -> Duration {
1805        *self.expect_value(&SSH_CHECK_INTERVAL)
1806    }
1807
1808    /// Returns the `ssh_connect_timeout` configuration parameter.
1809    pub fn ssh_connect_timeout(&self) -> Duration {
1810        *self.expect_value(&SSH_CONNECT_TIMEOUT)
1811    }
1812
1813    /// Returns the `ssh_keepalives_idle` configuration parameter.
1814    pub fn ssh_keepalives_idle(&self) -> Duration {
1815        *self.expect_value(&SSH_KEEPALIVES_IDLE)
1816    }
1817
1818    /// Returns the `kafka_socket_keepalive` configuration parameter.
1819    pub fn kafka_socket_keepalive(&self) -> bool {
1820        *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1821    }
1822
1823    /// Returns the `kafka_socket_timeout` configuration parameter.
1824    pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1825        *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1826    }
1827
1828    /// Returns the `kafka_transaction_timeout` configuration parameter.
1829    pub fn kafka_transaction_timeout(&self) -> Duration {
1830        *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1831    }
1832
1833    /// Returns the `kafka_socket_connection_setup_timeout` configuration parameter.
1834    pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1835        *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1836    }
1837
1838    /// Returns the `kafka_fetch_metadata_timeout` configuration parameter.
1839    pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1840        *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1841    }
1842
1843    /// Returns the `kafka_progress_record_fetch_timeout` configuration parameter.
1844    pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1845        *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1846    }
1847
1848    /// Returns the `crdb_connect_timeout` configuration parameter.
1849    pub fn crdb_connect_timeout(&self) -> Duration {
1850        *self.expect_config_value(UncasedStr::new(
1851            mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1852        ))
1853    }
1854
1855    /// Returns the `crdb_tcp_user_timeout` configuration parameter.
1856    pub fn crdb_tcp_user_timeout(&self) -> Duration {
1857        *self.expect_config_value(UncasedStr::new(
1858            mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1859        ))
1860    }
1861
1862    /// Returns the `crdb_keepalives_idle` configuration parameter.
1863    pub fn crdb_keepalives_idle(&self) -> Duration {
1864        *self.expect_config_value(UncasedStr::new(
1865            mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1866        ))
1867    }
1868
1869    /// Returns the `crdb_keepalives_interval` configuration parameter.
1870    pub fn crdb_keepalives_interval(&self) -> Duration {
1871        *self.expect_config_value(UncasedStr::new(
1872            mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1873        ))
1874    }
1875
1876    /// Returns the `crdb_keepalives_retries` configuration parameter.
1877    pub fn crdb_keepalives_retries(&self) -> u32 {
1878        *self.expect_config_value(UncasedStr::new(
1879            mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1880        ))
1881    }
1882
1883    /// Returns the `storage_dataflow_max_inflight_bytes` configuration parameter.
1884    pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1885        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1886    }
1887
1888    /// Returns the `storage_dataflow_max_inflight_bytes_to_cluster_size_fraction` configuration parameter.
1889    pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1890        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1891    }
1892
1893    /// Returns the `storage_shrink_upsert_unused_buffers_by_ratio` configuration parameter.
1894    pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1895        *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1896    }
1897
1898    /// Returns the `storage_dataflow_max_inflight_bytes_disk_only` configuration parameter.
1899    pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1900        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1901    }
1902
1903    /// Returns the `storage_statistics_interval` configuration parameter.
1904    pub fn storage_statistics_interval(&self) -> Duration {
1905        *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1906    }
1907
1908    /// Returns the `storage_statistics_collection_interval` configuration parameter.
1909    pub fn storage_statistics_collection_interval(&self) -> Duration {
1910        *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1911    }
1912
1913    /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
1914    pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1915        *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1916    }
1917
1918    /// Returns the `persist_stats_filter_enabled` configuration parameter.
1919    pub fn persist_stats_filter_enabled(&self) -> bool {
1920        *self.expect_config_value(UncasedStr::new(
1921            mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1922        ))
1923    }
1924
1925    pub fn scram_iterations(&self) -> NonZeroU32 {
1926        *self.expect_value(&SCRAM_ITERATIONS)
1927    }
1928
1929    pub fn dyncfg_updates(&self) -> ConfigUpdates {
1930        let mut updates = ConfigUpdates::default();
1931        for entry in self.dyncfgs.entries() {
1932            let name = UncasedStr::new(entry.name());
1933            let val = match entry.val() {
1934                ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1935                ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1936                ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1937                ConfigVal::OptUsize(_) => {
1938                    ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1939                }
1940                ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1941                ConfigVal::String(_) => {
1942                    ConfigVal::from(self.expect_config_value::<String>(name).clone())
1943                }
1944                ConfigVal::OptString(_) => {
1945                    ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
1946                }
1947                ConfigVal::Duration(_) => {
1948                    ConfigVal::from(*self.expect_config_value::<Duration>(name))
1949                }
1950                ConfigVal::Json(_) => {
1951                    ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1952                }
1953            };
1954            updates.add_dynamic(entry.name(), val);
1955        }
1956        updates.apply(&self.dyncfgs);
1957        updates
1958    }
1959
1960    /// Returns the `metrics_retention` configuration parameter.
1961    pub fn metrics_retention(&self) -> Duration {
1962        *self.expect_value(&METRICS_RETENTION)
1963    }
1964
1965    /// Returns the `unsafe_mock_audit_event_timestamp` configuration parameter.
1966    pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1967        *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1968    }
1969
1970    /// Returns the `enable_rbac_checks` configuration parameter.
1971    pub fn enable_rbac_checks(&self) -> bool {
1972        *self.expect_value(&ENABLE_RBAC_CHECKS)
1973    }
1974
1975    /// Returns the `max_connections` configuration parameter.
1976    pub fn max_connections(&self) -> u32 {
1977        *self.expect_value(&MAX_CONNECTIONS)
1978    }
1979
1980    pub fn default_network_policy_name(&self) -> String {
1981        self.expect_value::<String>(&NETWORK_POLICY).clone()
1982    }
1983
1984    /// Returns the `superuser_reserved_connections` configuration parameter.
1985    pub fn superuser_reserved_connections(&self) -> u32 {
1986        *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
1987    }
1988
1989    pub fn keep_n_source_status_history_entries(&self) -> usize {
1990        *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
1991    }
1992
1993    pub fn keep_n_sink_status_history_entries(&self) -> usize {
1994        *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
1995    }
1996
1997    pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
1998        *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
1999    }
2000
2001    pub fn replica_status_history_retention_window(&self) -> Duration {
2002        *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2003    }
2004
2005    /// Returns the `enable_storage_shard_finalization` configuration parameter.
2006    pub fn enable_storage_shard_finalization(&self) -> bool {
2007        *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2008    }
2009
2010    pub fn enable_consolidate_after_union_negate(&self) -> bool {
2011        *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
2012    }
2013
2014    /// Returns the `enable_default_connection_validation` configuration parameter.
2015    pub fn enable_default_connection_validation(&self) -> bool {
2016        *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2017    }
2018
2019    /// Returns the `default_timestamp_interval` configuration parameter.
2020    pub fn default_timestamp_interval(&self) -> Duration {
2021        *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2022    }
2023
2024    /// Returns the `min_timestamp_interval` configuration parameter.
2025    pub fn min_timestamp_interval(&self) -> Duration {
2026        *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2027    }
2028    /// Returns the `max_timestamp_interval` configuration parameter.
2029    pub fn max_timestamp_interval(&self) -> Duration {
2030        *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2031    }
2032
2033    pub fn logging_filter(&self) -> CloneableEnvFilter {
2034        self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2035            .clone()
2036    }
2037
2038    pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2039        self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2040            .clone()
2041    }
2042
2043    pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2044        self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2045            .clone()
2046    }
2047
2048    pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2049        self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2050            .clone()
2051    }
2052
2053    pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2054        self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2055            .clone()
2056    }
2057
2058    pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2059        *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2060    }
2061
2062    pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2063        *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2064    }
2065
2066    pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2067        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2068    }
2069
2070    pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2071        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2072    }
2073
2074    pub fn grpc_connect_timeout(&self) -> Duration {
2075        *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2076    }
2077
2078    pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2079        *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2080    }
2081
2082    pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2083        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2084    }
2085
2086    pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2087        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2088    }
2089
2090    pub fn cluster_enable_topology_spread(&self) -> bool {
2091        *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2092    }
2093
2094    pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2095        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2096    }
2097
2098    pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2099        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2100    }
2101
2102    pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2103        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2104    }
2105
2106    pub fn cluster_topology_spread_soft(&self) -> bool {
2107        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2108    }
2109
2110    pub fn cluster_soften_az_affinity(&self) -> bool {
2111        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2112    }
2113
2114    pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2115        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2116    }
2117
2118    pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2119        *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2120    }
2121
2122    pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2123        *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2124    }
2125
2126    pub fn cluster_security_context_enabled(&self) -> bool {
2127        *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2128    }
2129
2130    pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2131        *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2132    }
2133
2134    /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter.
2135    pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2136        *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2137    }
2138
2139    pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2140        *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2141    }
2142
2143    pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2144        *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2145    }
2146
2147    /// Returns the `statement_logging_max_sample_rate` configuration parameter.
2148    pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2149        *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2150    }
2151
2152    /// Returns the `statement_logging_default_sample_rate` configuration parameter.
2153    pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2154        *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2155    }
2156
2157    /// Returns the `enable_internal_statement_logging` configuration parameter.
2158    pub fn enable_internal_statement_logging(&self) -> bool {
2159        *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2160    }
2161
2162    /// Returns the `optimizer_stats_timeout` configuration parameter.
2163    pub fn optimizer_stats_timeout(&self) -> Duration {
2164        *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2165    }
2166
2167    /// Returns the `optimizer_oneshot_stats_timeout` configuration parameter.
2168    pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2169        *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2170    }
2171
2172    /// Returns the `webhook_concurrent_request_limit` configuration parameter.
2173    pub fn webhook_concurrent_request_limit(&self) -> usize {
2174        *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2175    }
2176
2177    /// Returns the `pg_timestamp_oracle_connection_pool_max_size` configuration parameter.
2178    pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2179        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2180    }
2181
2182    /// Returns the `pg_timestamp_oracle_connection_pool_max_wait` configuration parameter.
2183    pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2184        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2185    }
2186
2187    /// Returns the `pg_timestamp_oracle_connection_pool_ttl` configuration parameter.
2188    pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2189        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2190    }
2191
2192    /// Returns the `pg_timestamp_oracle_connection_pool_ttl_stagger` configuration parameter.
2193    pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2194        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2195    }
2196
2197    /// Returns the `user_storage_managed_collections_batch_duration` configuration parameter.
2198    pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2199        *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2200    }
2201
2202    pub fn force_source_table_syntax(&self) -> bool {
2203        *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2204    }
2205
2206    pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2207        *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2208    }
2209
2210    /// Returns whether the named variable is a controller configuration parameter.
2211    pub fn is_controller_config_var(&self, name: &str) -> bool {
2212        self.is_dyncfg_var(name)
2213    }
2214
2215    /// Returns whether the named variable is a compute configuration parameter
2216    /// (things that go in `ComputeParameters` and are sent to replicas via `UpdateConfiguration`
2217    /// commands).
2218    pub fn is_compute_config_var(&self, name: &str) -> bool {
2219        name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2220    }
2221
2222    /// Returns whether the named variable is a metrics configuration parameter
2223    pub fn is_metrics_config_var(&self, name: &str) -> bool {
2224        self.is_dyncfg_var(name)
2225    }
2226
2227    /// Returns whether the named variable is a storage configuration parameter.
2228    pub fn is_storage_config_var(&self, name: &str) -> bool {
2229        name == PG_SOURCE_CONNECT_TIMEOUT.name()
2230            || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2231            || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2232            || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2233            || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2234            || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2235            || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2236            || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2237            || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2238            || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2239            || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2240            || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2241            || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2242            || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2243            || name == SSH_CHECK_INTERVAL.name()
2244            || name == SSH_CONNECT_TIMEOUT.name()
2245            || name == SSH_KEEPALIVES_IDLE.name()
2246            || name == KAFKA_SOCKET_KEEPALIVE.name()
2247            || name == KAFKA_SOCKET_TIMEOUT.name()
2248            || name == KAFKA_TRANSACTION_TIMEOUT.name()
2249            || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2250            || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2251            || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2252            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2253            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2254            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2255            || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2256            || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2257            || name == STORAGE_STATISTICS_INTERVAL.name()
2258            || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2259            || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2260            || is_upsert_rocksdb_config_var(name)
2261            || self.is_dyncfg_var(name)
2262            || is_tracing_var(name)
2263    }
2264
2265    /// Returns whether the named variable is a dyncfg configuration parameter.
2266    fn is_dyncfg_var(&self, name: &str) -> bool {
2267        self.dyncfgs.entries().any(|e| name == e.name())
2268    }
2269}
2270
2271pub fn is_tracing_var(name: &str) -> bool {
2272    name == LOGGING_FILTER.name()
2273        || name == LOGGING_FILTER_DEFAULTS.name()
2274        || name == OPENTELEMETRY_FILTER.name()
2275        || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2276        || name == SENTRY_FILTERS.name()
2277}
2278
2279/// Returns whether the named variable is a caching configuration parameter.
2280pub fn is_secrets_caching_var(name: &str) -> bool {
2281    name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2282}
2283
2284fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2285    name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2286        || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2287        || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2288        || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2289        || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2290        || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2291        || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2292        || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2293        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2294        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2295        || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2296        || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2297}
2298
2299/// Returns whether the named variable is a (Postgres/CRDB) timestamp oracle
2300/// configuration parameter.
2301pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2302    name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2303        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2304        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2305        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2306        || name == CRDB_CONNECT_TIMEOUT.name()
2307        || name == CRDB_TCP_USER_TIMEOUT.name()
2308        || name == CRDB_KEEPALIVES_IDLE.name()
2309        || name == CRDB_KEEPALIVES_INTERVAL.name()
2310        || name == CRDB_KEEPALIVES_RETRIES.name()
2311}
2312
2313/// Returns whether the named variable is a cluster scheduling config
2314pub fn is_cluster_scheduling_var(name: &str) -> bool {
2315    name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2316        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2317        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2318        || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2319        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2320        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2321        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2322        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2323        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2324}
2325
2326/// Returns whether the named variable is an HTTP server related config var.
2327pub fn is_http_config_var(name: &str) -> bool {
2328    name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2329}
2330
2331/// Set of [`SystemVar`]s that can also get set at a per-Session level.
2332///
2333/// TODO(parkmycar): Instead of a separate list, make this a field on VarDefinition.
2334static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2335    LazyLock::new(|| {
2336        [
2337            &APPLICATION_NAME,
2338            &CLIENT_ENCODING,
2339            &CLIENT_MIN_MESSAGES,
2340            &CLUSTER,
2341            &CLUSTER_REPLICA,
2342            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2343            &CURRENT_OBJECT_MISSING_WARNINGS,
2344            &DATABASE,
2345            &DATE_STYLE,
2346            &EXTRA_FLOAT_DIGITS,
2347            &INTEGER_DATETIMES,
2348            &INTERVAL_STYLE,
2349            &REAL_TIME_RECENCY_TIMEOUT,
2350            &SEARCH_PATH,
2351            &STANDARD_CONFORMING_STRINGS,
2352            &STATEMENT_TIMEOUT,
2353            &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2354            &TIMEZONE,
2355            &TRANSACTION_ISOLATION,
2356            &MAX_QUERY_RESULT_SIZE,
2357        ]
2358        .into_iter()
2359        .map(|var| (UncasedStr::new(var.name()), var))
2360        .collect()
2361    });
2362
2363// Provides a wrapper to express that a particular `ServerVar` is meant to be used as a feature
2364/// flag.
2365#[derive(Debug)]
2366pub struct FeatureFlag {
2367    pub flag: &'static VarDefinition,
2368    pub feature_desc: &'static str,
2369}
2370
2371impl FeatureFlag {
2372    /// Returns an error unless the feature flag is enabled in the provided
2373    /// `system_vars`.
2374    pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2375        match *system_vars.expect_value::<bool>(self.flag) {
2376            true => Ok(()),
2377            false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2378        }
2379    }
2380}
2381
2382impl PartialEq for FeatureFlag {
2383    fn eq(&self, other: &FeatureFlag) -> bool {
2384        self.flag.name() == other.flag.name()
2385    }
2386}
2387
2388impl Eq for FeatureFlag {}
2389
2390impl Var for MzVersion {
2391    fn name(&self) -> &'static str {
2392        MZ_VERSION_NAME.as_str()
2393    }
2394
2395    fn value(&self) -> String {
2396        self.build_info
2397            .human_version(self.helm_chart_version.clone())
2398    }
2399
2400    fn description(&self) -> &'static str {
2401        "Shows the Materialize server version (Materialize)."
2402    }
2403
2404    fn type_name(&self) -> Cow<'static, str> {
2405        String::type_name()
2406    }
2407
2408    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2409        Ok(())
2410    }
2411}
2412
2413impl Var for User {
2414    fn name(&self) -> &'static str {
2415        IS_SUPERUSER_NAME.as_str()
2416    }
2417
2418    fn value(&self) -> String {
2419        self.is_superuser().format()
2420    }
2421
2422    fn description(&self) -> &'static str {
2423        "Reports whether the current session is a superuser (PostgreSQL)."
2424    }
2425
2426    fn type_name(&self) -> Cow<'static, str> {
2427        bool::type_name()
2428    }
2429
2430    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2431        Ok(())
2432    }
2433}