Skip to main content

mz_sql/session/vars/
definitions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::num::NonZeroU32;
12use std::str::FromStr;
13use std::sync::Arc;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::{DateTime, Utc};
18use derivative::Derivative;
19use mz_adapter_types::timestamp_oracle::{
20    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
21    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
22};
23use mz_dyncfg::ParameterScope;
24use mz_ore::cast::{self, CastFrom};
25use mz_repr::adt::numeric::Numeric;
26use mz_repr::adt::timestamp::CheckedTimestamp;
27use mz_repr::bytes::ByteSize;
28use mz_repr::optimize::OptimizerFeatures;
29use mz_sql_parser::ast::Ident;
30use mz_sql_parser::ident;
31use mz_storage_types::parameters::REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT;
32use mz_storage_types::parameters::{
33    DEFAULT_PG_SOURCE_CONNECT_TIMEOUT, DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
34    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE, DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
35    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES, DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT,
36    DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT, STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
37};
38use mz_tracing::{CloneableEnvFilter, SerializableDirective};
39use uncased::UncasedStr;
40
41use crate::session::user::{SUPPORT_USER, SYSTEM_USER, User};
42use crate::session::vars::constraints::{
43    BYTESIZE_AT_LEAST_1MB, DomainConstraint, NON_ZERO_DURATION, NUMERIC_BOUNDED_0_1_INCLUSIVE,
44    NUMERIC_NON_NEGATIVE, ValueConstraint,
45};
46use crate::session::vars::errors::VarError;
47use crate::session::vars::polyfill::{LazyValueFn, lazy_value, value};
48use crate::session::vars::value::{
49    ClientEncoding, ClientSeverity, DEFAULT_DATE_STYLE, Failpoints, IntervalStyle, IsolationLevel,
50    TimeZone, Value,
51};
52use crate::session::vars::{FeatureFlag, Var, VarInput, VarParseError};
53use crate::{DEFAULT_SCHEMA, WEBHOOK_CONCURRENCY_LIMIT};
54
55/// Definition of a variable.
56#[derive(Clone, Derivative)]
57#[derivative(Debug)]
58pub struct VarDefinition {
59    /// Name of the variable, case-insensitive matching.
60    pub name: &'static UncasedStr,
61    /// Description of the variable.
62    pub description: &'static str,
63    /// Is the variable visible to users, when false only visible to system users.
64    pub user_visible: bool,
65
66    /// Default compiled in value for this variable.
67    pub value: VarDefaultValue,
68    /// Constraint that must be upheld for this variable to be valid.
69    pub constraint: Option<ValueConstraint>,
70    /// When set, prevents getting or setting the variable unless the specified
71    /// feature flag is enabled.
72    pub require_feature_flag: Option<&'static FeatureFlag>,
73    /// The scope at which this variable's value may be overridden by the
74    /// LaunchDarkly sync loop.
75    pub scope: ParameterScope,
76
77    /// Method to parse [`VarInput`] into a type that implements [`Value`].
78    ///
79    /// The reason `parse` exists as a function pointer is because we want to achieve two things:
80    ///   1. `VarDefinition` has no generic parameters.
81    ///   2. `Value::parse` returns an instance of `Self`.
82    /// `VarDefinition` holds a `dyn Value`, but `Value::parse` is not object safe because it
83    /// returns `Self`, so we can't call that method. We could change `Value::parse` to return a
84    /// `Box<dyn Value>` making it object safe, but that creates a footgun where it's possible for
85    /// `Value::parse` to return a type that isn't `Self`, e.g. `<String as Value>::parse` could
86    /// return a `usize`!
87    ///
88    /// So to prevent making `VarDefinition` generic over some type `V: Value`, but also defining
89    /// `Value::parse` as returning `Self`, we store a static function pointer to the `parse`
90    /// implementation of our default value.
91    #[derivative(Debug = "ignore")]
92    parse: fn(VarInput) -> Result<Box<dyn Value>, VarParseError>,
93    /// Returns a human readable name for the type of this variable. We store this as a static
94    /// function pointer for the same reason as `parse`.
95    #[derivative(Debug = "ignore")]
96    type_name: fn() -> Cow<'static, str>,
97}
98static_assertions::assert_impl_all!(VarDefinition: Send, Sync);
99
100impl VarDefinition {
101    /// Create a new [`VarDefinition`] in a const context with a value known at compile time.
102    pub const fn new<V: Value>(
103        name: &'static str,
104        value: &'static V,
105        description: &'static str,
106        user_visible: bool,
107    ) -> Self {
108        VarDefinition {
109            name: UncasedStr::new(name),
110            description,
111            value: VarDefaultValue::Static(value),
112            user_visible,
113            parse: V::parse_dyn_value,
114            type_name: V::type_name,
115            constraint: None,
116            require_feature_flag: None,
117            scope: ParameterScope::DEFAULT,
118        }
119    }
120
121    /// Create a new [`VarDefinition`] in a const context with a lazily evaluated value.
122    pub const fn new_lazy<V: Value, L: LazyValueFn<V>>(
123        name: &'static str,
124        _value: L,
125        description: &'static str,
126        user_visible: bool,
127    ) -> Self {
128        VarDefinition {
129            name: UncasedStr::new(name),
130            description,
131            value: VarDefaultValue::Lazy(L::LAZY_VALUE_FN),
132            user_visible,
133            parse: V::parse_dyn_value,
134            type_name: V::type_name,
135            constraint: None,
136            require_feature_flag: None,
137            scope: ParameterScope::DEFAULT,
138        }
139    }
140
141    /// Create a new [`VarDefinition`] with a value known at runtime.
142    pub fn new_runtime<V: Value>(
143        name: &'static str,
144        value: V,
145        description: &'static str,
146        user_visible: bool,
147    ) -> Self {
148        VarDefinition {
149            name: UncasedStr::new(name),
150            description,
151            value: VarDefaultValue::Runtime(Arc::new(value)),
152            user_visible,
153            parse: V::parse_dyn_value,
154            type_name: V::type_name,
155            constraint: None,
156            require_feature_flag: None,
157            scope: ParameterScope::DEFAULT,
158        }
159    }
160
161    /// TODO(parkmycar): Refactor this method onto a `VarDefinitionBuilder` that would allow us to
162    /// constrain `V` here to be the same `V` used in [`VarDefinition::new`].
163    pub const fn with_constraint<V: Value, D: DomainConstraint<Value = V>>(
164        mut self,
165        constraint: &'static D,
166    ) -> Self {
167        self.constraint = Some(ValueConstraint::Domain(constraint));
168        self
169    }
170
171    pub const fn fixed(mut self) -> Self {
172        self.constraint = Some(ValueConstraint::Fixed);
173        self
174    }
175
176    pub const fn read_only(mut self) -> Self {
177        self.constraint = Some(ValueConstraint::ReadOnly);
178        self
179    }
180
181    pub const fn with_feature_flag(mut self, feature_flag: &'static FeatureFlag) -> Self {
182        self.require_feature_flag = Some(feature_flag);
183        self
184    }
185
186    /// Declares the [`ParameterScope`] of this variable, overriding the
187    /// [default](ParameterScope::DEFAULT). See [`ParameterScope`] for the
188    /// semantics of each scope class.
189    pub const fn scoped(mut self, scope: ParameterScope) -> Self {
190        self.scope = scope;
191        self
192    }
193
194    pub fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
195        (self.parse)(input).map_err(|err| err.into_var_error(self))
196    }
197
198    pub fn default_value(&self) -> &'_ dyn Value {
199        self.value.value()
200    }
201}
202
203impl Var for VarDefinition {
204    fn name(&self) -> &'static str {
205        self.name.as_str()
206    }
207
208    fn value(&self) -> String {
209        self.default_value().format()
210    }
211
212    fn description(&self) -> &'static str {
213        self.description
214    }
215
216    fn type_name(&self) -> Cow<'static, str> {
217        (self.type_name)()
218    }
219
220    fn scope(&self) -> ParameterScope {
221        self.scope
222    }
223
224    fn visible(&self, user: &User, system_vars: &super::SystemVars) -> Result<(), VarError> {
225        if !self.user_visible && user != &*SYSTEM_USER && user != &*SUPPORT_USER {
226            Err(VarError::UnknownParameter(self.name().to_string()))
227        } else if self.is_unsafe() && !system_vars.allow_unsafe() {
228            Err(VarError::RequiresUnsafeMode(self.name()))
229        } else {
230            if let Some(flag) = self.require_feature_flag {
231                flag.require(system_vars)?;
232            }
233
234            Ok(())
235        }
236    }
237}
238
239/// The kinds of compiled in default values that can be used with [`VarDefinition`].
240#[derive(Clone, Debug)]
241pub enum VarDefaultValue {
242    /// Static that can be evaluated at compile time.
243    Static(&'static dyn Value),
244    /// Lazy value that is defined at compile time, but created at runtime.
245    Lazy(fn() -> &'static dyn Value),
246    /// Value created at runtime. Note: This is generally an escape hatch.
247    Runtime(Arc<dyn Value>),
248}
249
250impl VarDefaultValue {
251    pub fn value(&self) -> &'_ dyn Value {
252        match self {
253            VarDefaultValue::Static(s) => *s,
254            VarDefaultValue::Lazy(l) => (l)(),
255            VarDefaultValue::Runtime(r) => r.as_ref(),
256        }
257    }
258}
259
260// We pretend to be Postgres v9.5.0, which is also what CockroachDB pretends to
261// be. Too new and some clients will emit a "server too new" warning. Too old
262// and some clients will fall back to legacy code paths. v9.5.0 empirically
263// seems to be a good compromise.
264
265/// The major version of PostgreSQL that Materialize claims to be.
266pub const SERVER_MAJOR_VERSION: u8 = 9;
267
268/// The minor version of PostgreSQL that Materialize claims to be.
269pub const SERVER_MINOR_VERSION: u8 = 5;
270
271/// The patch version of PostgreSQL that Materialize claims to be.
272pub const SERVER_PATCH_VERSION: u8 = 0;
273
274/// The name of the default database that Materialize uses.
275pub const DEFAULT_DATABASE_NAME: &str = "materialize";
276
277pub static APPLICATION_NAME: VarDefinition = VarDefinition::new(
278    "application_name",
279    value!(String; String::new()),
280    "Sets the application name to be reported in statistics and logs (PostgreSQL).",
281    true,
282);
283
284pub static CLIENT_ENCODING: VarDefinition = VarDefinition::new(
285    "client_encoding",
286    value!(ClientEncoding; ClientEncoding::Utf8),
287    "Sets the client's character set encoding (PostgreSQL).",
288    true,
289);
290
291pub static CLIENT_MIN_MESSAGES: VarDefinition = VarDefinition::new(
292    "client_min_messages",
293    value!(ClientSeverity; ClientSeverity::Notice),
294    "Sets the message levels that are sent to the client (PostgreSQL).",
295    true,
296);
297
298pub static CLUSTER: VarDefinition = VarDefinition::new_lazy(
299    "cluster",
300    lazy_value!(String; || "quickstart".to_string()),
301    "Sets the current cluster (Materialize).",
302    true,
303);
304
305pub static CLUSTER_REPLICA: VarDefinition = VarDefinition::new(
306    "cluster_replica",
307    value!(Option<String>; None),
308    "Sets a target cluster replica for SELECT queries (Materialize).",
309    true,
310);
311
312pub static CURRENT_OBJECT_MISSING_WARNINGS: VarDefinition = VarDefinition::new(
313    "current_object_missing_warnings",
314    value!(bool; true),
315    "Whether to emit warnings when the current database, schema, or cluster is missing (Materialize).",
316    true,
317);
318
319pub static DATABASE: VarDefinition = VarDefinition::new_lazy(
320    "database",
321    lazy_value!(String; || DEFAULT_DATABASE_NAME.to_string()),
322    "Sets the current database (CockroachDB).",
323    true,
324);
325
326pub static DATE_STYLE: VarDefinition = VarDefinition::new(
327    // DateStyle has nonstandard capitalization for historical reasons.
328    "DateStyle",
329    &DEFAULT_DATE_STYLE,
330    "Sets the display format for date and time values (PostgreSQL).",
331    true,
332);
333
334pub static DEFAULT_CLUSTER_REPLICATION_FACTOR: VarDefinition = VarDefinition::new(
335    "default_cluster_replication_factor",
336    value!(u32; 1),
337    "Default cluster replication factor (Materialize).",
338    true,
339);
340
341pub static EXTRA_FLOAT_DIGITS: VarDefinition = VarDefinition::new(
342    "extra_float_digits",
343    value!(i32; 3),
344    "Adjusts the number of digits displayed for floating-point values (PostgreSQL).",
345    true,
346);
347
348pub static FAILPOINTS: VarDefinition = VarDefinition::new(
349    "failpoints",
350    value!(Failpoints; Failpoints),
351    "Allows failpoints to be dynamically activated.",
352    true,
353);
354
355pub static INTEGER_DATETIMES: VarDefinition = VarDefinition::new(
356    "integer_datetimes",
357    value!(bool; true),
358    "Reports whether the server uses 64-bit-integer dates and times (PostgreSQL).",
359    true,
360)
361.fixed();
362
363pub static INTERVAL_STYLE: VarDefinition = VarDefinition::new(
364    // IntervalStyle has nonstandard capitalization for historical reasons.
365    "IntervalStyle",
366    value!(IntervalStyle; IntervalStyle::Postgres),
367    "Sets the display format for interval values (PostgreSQL).",
368    true,
369);
370
371pub const MZ_VERSION_NAME: &UncasedStr = UncasedStr::new("mz_version");
372pub const IS_SUPERUSER_NAME: &UncasedStr = UncasedStr::new("is_superuser");
373
374// Schema can be used an alias for a search path with a single element.
375pub const SCHEMA_ALIAS: &UncasedStr = UncasedStr::new("schema");
376pub static SEARCH_PATH: VarDefinition = VarDefinition::new_lazy(
377    "search_path",
378    lazy_value!(Vec<Ident>; || vec![ident!(DEFAULT_SCHEMA)]),
379    "Sets the schema search order for names that are not schema-qualified (PostgreSQL).",
380    true,
381);
382
383pub static STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
384    "statement_timeout",
385    value!(Duration; Duration::from_secs(60)),
386    "Sets the maximum allowed duration of INSERT...SELECT, UPDATE, and DELETE operations. \
387    If this value is specified without units, it is taken as milliseconds.",
388    true,
389);
390
391pub static IDLE_IN_TRANSACTION_SESSION_TIMEOUT: VarDefinition = VarDefinition::new(
392    "idle_in_transaction_session_timeout",
393    value!(Duration; Duration::from_secs(60 * 2)),
394    "Sets the maximum allowed duration that a session can sit idle in a transaction before \
395    being terminated. If this value is specified without units, it is taken as milliseconds. \
396    A value of zero disables the timeout (PostgreSQL).",
397    true,
398);
399
400pub static SERVER_VERSION: VarDefinition = VarDefinition::new_lazy(
401    "server_version",
402    lazy_value!(String; || {
403        format!("{SERVER_MAJOR_VERSION}.{SERVER_MINOR_VERSION}.{SERVER_PATCH_VERSION}")
404    }),
405    "Shows the PostgreSQL compatible server version (PostgreSQL).",
406    true,
407)
408.read_only();
409
410pub static SERVER_VERSION_NUM: VarDefinition = VarDefinition::new(
411    "server_version_num",
412    value!(i32; (cast::u8_to_i32(SERVER_MAJOR_VERSION) * 10_000)
413        + (cast::u8_to_i32(SERVER_MINOR_VERSION) * 100)
414        + cast::u8_to_i32(SERVER_PATCH_VERSION)),
415    "Shows the PostgreSQL compatible server version as an integer (PostgreSQL).",
416    true,
417)
418.read_only();
419
420pub static SQL_SAFE_UPDATES: VarDefinition = VarDefinition::new(
421    "sql_safe_updates",
422    value!(bool; false),
423    "Prohibits SQL statements that may be overly destructive (CockroachDB).",
424    true,
425);
426
427pub static STANDARD_CONFORMING_STRINGS: VarDefinition = VarDefinition::new(
428    "standard_conforming_strings",
429    value!(bool; true),
430    "Causes '...' strings to treat backslashes literally (PostgreSQL).",
431    true,
432)
433.fixed();
434
435pub static TIMEZONE: VarDefinition = VarDefinition::new(
436    // TimeZone has nonstandard capitalization for historical reasons.
437    "TimeZone",
438    value!(TimeZone; TimeZone::UTC),
439    "Sets the time zone for displaying and interpreting time stamps (PostgreSQL).",
440    true,
441);
442
443pub const TRANSACTION_ISOLATION_VAR_NAME: &str = "transaction_isolation";
444pub static TRANSACTION_ISOLATION: VarDefinition = VarDefinition::new(
445    TRANSACTION_ISOLATION_VAR_NAME,
446    value!(IsolationLevel; IsolationLevel::StrictSerializable),
447    "Sets the current transaction's isolation level (PostgreSQL).",
448    true,
449);
450
451pub static MAX_KAFKA_CONNECTIONS: VarDefinition = VarDefinition::new(
452    "max_kafka_connections",
453    value!(u32; 1000),
454    "The maximum number of Kafka connections in the region, across all schemas (Materialize).",
455    true,
456);
457
458pub static MAX_POSTGRES_CONNECTIONS: VarDefinition = VarDefinition::new(
459    "max_postgres_connections",
460    value!(u32; 1000),
461    "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize).",
462    true,
463);
464
465pub static MAX_MYSQL_CONNECTIONS: VarDefinition = VarDefinition::new(
466    "max_mysql_connections",
467    value!(u32; 1000),
468    "The maximum number of MySQL connections in the region, across all schemas (Materialize).",
469    true,
470);
471
472pub static MAX_SQL_SERVER_CONNECTIONS: VarDefinition = VarDefinition::new(
473    "max_sql_server_connections",
474    value!(u32; 1000),
475    "The maximum number of SQL Server connections in the region, across all schemas (Materialize).",
476    true,
477);
478
479pub static MAX_AWS_PRIVATELINK_CONNECTIONS: VarDefinition = VarDefinition::new(
480    "max_aws_privatelink_connections",
481    value!(u32; 0),
482    "The maximum number of AWS PrivateLink connections in the region, across all schemas (Materialize).",
483    true,
484);
485
486pub static MAX_TABLES: VarDefinition = VarDefinition::new(
487    "max_tables",
488    value!(u32; 200),
489    "The maximum number of tables in the region, across all schemas (Materialize).",
490    true,
491);
492
493pub static MAX_SOURCES: VarDefinition = VarDefinition::new(
494    "max_sources",
495    value!(u32; 200),
496    "The maximum number of sources in the region, across all schemas (Materialize).",
497    true,
498);
499
500pub static MAX_SINKS: VarDefinition = VarDefinition::new(
501    "max_sinks",
502    value!(u32; 1000),
503    "The maximum number of sinks in the region, across all schemas (Materialize).",
504    true,
505);
506
507pub static MAX_MATERIALIZED_VIEWS: VarDefinition = VarDefinition::new(
508    "max_materialized_views",
509    value!(u32; 500),
510    "The maximum number of materialized views in the region, across all schemas (Materialize).",
511    true,
512);
513
514pub static MAX_CLUSTERS: VarDefinition = VarDefinition::new(
515    "max_clusters",
516    value!(u32; 25),
517    "The maximum number of clusters in the region (Materialize).",
518    true,
519);
520
521pub static MAX_REPLICAS_PER_CLUSTER: VarDefinition = VarDefinition::new(
522    "max_replicas_per_cluster",
523    value!(u32; 5),
524    "The maximum number of replicas of a single cluster (Materialize).",
525    true,
526);
527
528pub static MAX_CREDIT_CONSUMPTION_RATE: VarDefinition = VarDefinition::new_lazy(
529    "max_credit_consumption_rate",
530    lazy_value!(Numeric; || 1024.into()),
531    "The maximum rate of credit consumption in a region. Credits are consumed based on the size of cluster replicas in use (Materialize).",
532    true,
533)
534.with_constraint(&NUMERIC_NON_NEGATIVE);
535
536pub static MAX_DATABASES: VarDefinition = VarDefinition::new(
537    "max_databases",
538    value!(u32; 1000),
539    "The maximum number of databases in the region (Materialize).",
540    true,
541);
542
543pub static MAX_SCHEMAS_PER_DATABASE: VarDefinition = VarDefinition::new(
544    "max_schemas_per_database",
545    value!(u32; 1000),
546    "The maximum number of schemas in a database (Materialize).",
547    true,
548);
549
550pub static MAX_OBJECTS_PER_SCHEMA: VarDefinition = VarDefinition::new(
551    "max_objects_per_schema",
552    value!(u32; 1000),
553    "The maximum number of objects in a schema (Materialize).",
554    true,
555);
556
557pub static MAX_SECRETS: VarDefinition = VarDefinition::new(
558    "max_secrets",
559    value!(u32; 100),
560    "The maximum number of secrets in the region, across all schemas (Materialize).",
561    true,
562);
563
564pub static MAX_ROLES: VarDefinition = VarDefinition::new(
565    "max_roles",
566    value!(u32; 1000),
567    "The maximum number of roles in the region (Materialize).",
568    true,
569);
570
571pub static MAX_NETWORK_POLICIES: VarDefinition = VarDefinition::new(
572    "max_network_policies",
573    value!(u32; 25),
574    "The maximum number of network policies in the region.",
575    true,
576);
577
578pub static MAX_RULES_PER_NETWORK_POLICY: VarDefinition = VarDefinition::new(
579    "max_rules_per_network_policy",
580    value!(u32; 25),
581    "The maximum number of rules per network policies.",
582    true,
583);
584
585// Cloud environmentd is configured with 4 GiB of RAM, so 1 GiB is a good heuristic for a single
586// query.
587//
588// We constrain this parameter to a minimum of 1MB, to avoid accidental usage of values that will
589// interfere with queries executed by the system itself.
590//
591// TODO(jkosh44) Eventually we want to be able to return arbitrary sized results.
592pub static MAX_RESULT_SIZE: VarDefinition = VarDefinition::new(
593    "max_result_size",
594    value!(ByteSize; ByteSize::gb(1)),
595    "The maximum size in bytes for an internal query result (Materialize).",
596    true,
597)
598.with_constraint(&BYTESIZE_AT_LEAST_1MB);
599
600pub static MAX_QUERY_RESULT_SIZE: VarDefinition = VarDefinition::new(
601    "max_query_result_size",
602    value!(ByteSize; ByteSize::gb(1)),
603    "The maximum size in bytes for a single query's result (Materialize).",
604    true,
605);
606
607pub static MAX_COPY_FROM_ROW_SIZE: VarDefinition = VarDefinition::new(
608    "max_copy_from_row_size",
609    value!(ByteSize; ByteSize::mb(128)),
610    "The maximum size in bytes for a single COPY FROM STDIN row (Materialize).",
611    true,
612);
613
614pub static MAX_IDENTIFIER_LENGTH: VarDefinition = VarDefinition::new(
615    "max_identifier_length",
616    value!(usize; mz_sql_lexer::lexer::MAX_IDENTIFIER_LENGTH),
617    "The maximum length of object identifiers in bytes (PostgreSQL).",
618    true,
619);
620
621pub static WELCOME_MESSAGE: VarDefinition = VarDefinition::new(
622    "welcome_message",
623    value!(bool; true),
624    "Whether to send a notice with a welcome message after a successful connection (Materialize).",
625    true,
626);
627
628/// The logical compaction window for builtin tables and sources that have the
629/// `retained_metrics_relation` flag set.
630///
631/// The existence of this variable is a bit of a hack until we have a fully
632/// general solution for controlling retention windows.
633pub static METRICS_RETENTION: VarDefinition = VarDefinition::new(
634    "metrics_retention",
635    // 30 days
636    value!(Duration; Duration::from_secs(30 * 24 * 60 * 60)),
637    "The time to retain cluster utilization metrics (Materialize).",
638    false,
639);
640
641pub static ALLOWED_CLUSTER_REPLICA_SIZES: VarDefinition = VarDefinition::new(
642    "allowed_cluster_replica_sizes",
643    value!(Vec<Ident>; Vec::new()),
644    "The allowed sizes when creating a new cluster replica (Materialize).",
645    true,
646);
647
648pub static PERSIST_FAST_PATH_LIMIT: VarDefinition = VarDefinition::new(
649    "persist_fast_path_limit",
650    value!(usize; 25),
651    "An exclusive upper bound on the number of results we may return from a Persist fast-path peek; \
652    queries that may return more results will follow the normal / slow path. \
653    Setting this to 0 disables the feature.",
654    false,
655);
656
657/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_size`.
658pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE: VarDefinition = VarDefinition::new(
659    "pg_timestamp_oracle_connection_pool_max_size",
660    value!(usize; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE),
661    "Maximum size of the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
662    false,
663);
664
665/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_wait`.
666pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT: VarDefinition = VarDefinition::new(
667    "pg_timestamp_oracle_connection_pool_max_wait",
668    value!(Option<Duration>; Some(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT)),
669    "The maximum time to wait when attempting to obtain a connection from the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
670    false,
671);
672
673/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl`.
674pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL: VarDefinition = VarDefinition::new(
675    "pg_timestamp_oracle_connection_pool_ttl",
676    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
677    "The minimum TTL of a Consensus connection to Postgres/CRDB before it is proactively terminated",
678    false,
679);
680
681/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl_stagger`.
682pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER: VarDefinition = VarDefinition::new(
683    "pg_timestamp_oracle_connection_pool_ttl_stagger",
684    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER),
685    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
686    false,
687);
688
689pub static UNSAFE_NEW_TRANSACTION_WALL_TIME: VarDefinition = VarDefinition::new(
690    "unsafe_new_transaction_wall_time",
691    value!(Option<CheckedTimestamp<DateTime<Utc>>>; None),
692    "Sets the wall time for all new explicit or implicit transactions to control the value of `now()`. \
693    If not set, uses the system's clock.",
694    // This needs to be true because `user_visible: false` things are only modifiable by the mz_system
695    // and mz_support users, and we want sqllogictest to have access with its user. Because the name
696    // starts with "unsafe" it still won't be visible or changeable by users unless unsafe mode is
697    // enabled.
698    true,
699);
700
701pub static SCRAM_ITERATIONS: VarDefinition = VarDefinition::new(
702    "scram_iterations",
703    // / The default iteration count as suggested by
704    // / <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
705    value!(NonZeroU32; NonZeroU32::new(600_000).unwrap()),
706    "Iterations to use when hashing passwords. Higher iterations are more secure, but take longer to validated. \
707    Please consider the security risks before reducing this below the default value.",
708    true,
709);
710
711/// Tuning for RocksDB used by `UPSERT` sources that takes effect on restart.
712pub mod upsert_rocksdb {
713    use super::*;
714    use mz_rocksdb_types::config::{CompactionStyle, CompressionType};
715
716    pub static UPSERT_ROCKSDB_COMPACTION_STYLE: VarDefinition = VarDefinition::new(
717        "upsert_rocksdb_compaction_style",
718        value!(CompactionStyle; mz_rocksdb_types::defaults::DEFAULT_COMPACTION_STYLE),
719        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
720        sources. Described in the `mz_rocksdb_types::config` module. \
721        Only takes effect on source restart (Materialize).",
722        false,
723    );
724
725    pub static UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET: VarDefinition =
726        VarDefinition::new(
727            "upsert_rocksdb_optimize_compaction_memtable_budget",
728            value!(usize; mz_rocksdb_types::defaults::DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET),
729            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
730        sources. Described in the `mz_rocksdb_types::config` module. \
731        Only takes effect on source restart (Materialize).",
732            false,
733        );
734
735    pub static UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES: VarDefinition =
736        VarDefinition::new(
737            "upsert_rocksdb_level_compaction_dynamic_level_bytes",
738            value!(bool; mz_rocksdb_types::defaults::DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES),
739            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
740        sources. Described in the `mz_rocksdb_types::config` module. \
741        Only takes effect on source restart (Materialize).",
742            false,
743        );
744
745    pub static UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO: VarDefinition = VarDefinition::new(
746        "upsert_rocksdb_universal_compaction_ratio",
747        value!(i32; mz_rocksdb_types::defaults::DEFAULT_UNIVERSAL_COMPACTION_RATIO),
748        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
749        sources. Described in the `mz_rocksdb_types::config` module. \
750        Only takes effect on source restart (Materialize).",
751        false,
752    );
753
754    pub static UPSERT_ROCKSDB_PARALLELISM: VarDefinition = VarDefinition::new(
755        "upsert_rocksdb_parallelism",
756        value!(Option<i32>; mz_rocksdb_types::defaults::DEFAULT_PARALLELISM),
757        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
758        sources. Described in the `mz_rocksdb_types::config` module. \
759        Only takes effect on source restart (Materialize).",
760        false,
761    );
762
763    pub static UPSERT_ROCKSDB_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
764        "upsert_rocksdb_compression_type",
765        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_COMPRESSION_TYPE),
766        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
767        sources. Described in the `mz_rocksdb_types::config` module. \
768        Only takes effect on source restart (Materialize).",
769        false,
770    );
771
772    pub static UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
773        "upsert_rocksdb_bottommost_compression_type",
774        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_BOTTOMMOST_COMPRESSION_TYPE),
775        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
776        sources. Described in the `mz_rocksdb_types::config` module. \
777        Only takes effect on source restart (Materialize).",
778        false,
779    );
780
781    pub static UPSERT_ROCKSDB_BATCH_SIZE: VarDefinition = VarDefinition::new(
782        "upsert_rocksdb_batch_size",
783        value!(usize; mz_rocksdb_types::defaults::DEFAULT_BATCH_SIZE),
784        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
785        sources. Described in the `mz_rocksdb_types::config` module. \
786        Can be changed dynamically (Materialize).",
787        false,
788    );
789
790    pub static UPSERT_ROCKSDB_RETRY_DURATION: VarDefinition = VarDefinition::new(
791        "upsert_rocksdb_retry_duration",
792        value!(Duration; mz_rocksdb_types::defaults::DEFAULT_RETRY_DURATION),
793        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
794        sources. Described in the `mz_rocksdb_types::config` module. \
795        Only takes effect on source restart (Materialize).",
796        false,
797    );
798
799    pub static UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
800        "upsert_rocksdb_stats_log_interval_seconds",
801        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_LOG_INTERVAL_S),
802        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
803        sources. Described in the `mz_rocksdb_types::config` module. \
804        Only takes effect on source restart (Materialize).",
805        false,
806    );
807
808    pub static UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
809        "upsert_rocksdb_stats_persist_interval_seconds",
810        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_PERSIST_INTERVAL_S),
811        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
812        sources. Described in the `mz_rocksdb_types::config` module. \
813        Only takes effect on source restart (Materialize).",
814        false,
815    );
816
817    pub static UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB: VarDefinition = VarDefinition::new(
818        "upsert_rocksdb_point_lookup_block_cache_size_mb",
819        value!(Option<u32>; None),
820        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
821        sources. Described in the `mz_rocksdb_types::config` module. \
822        Only takes effect on source restart (Materialize).",
823        false,
824    );
825
826    /// The number of times by which allocated buffers will be shrinked in upsert rocksdb.
827    /// If value is 0, then no shrinking will occur.
828    pub static UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
829        "upsert_rocksdb_shrink_allocated_buffers_by_ratio",
830        value!(usize; mz_rocksdb_types::defaults::DEFAULT_SHRINK_BUFFERS_BY_RATIO),
831        "The number of times by which allocated buffers will be shrinked in upsert rocksdb.",
832        false,
833    );
834
835    /// Only used if `upsert_rocksdb_write_buffer_manager_memory_bytes` is also set
836    /// and write buffer manager is enabled
837    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION: VarDefinition =
838        VarDefinition::new(
839            "upsert_rocksdb_write_buffer_manager_cluster_memory_fraction",
840            value!(Option<Numeric>; None),
841            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
842        sources. Described in the `mz_rocksdb_types::config` module. \
843        Only takes effect on source restart (Materialize).",
844            false,
845        );
846
847    /// `upsert_rocksdb_write_buffer_manager_memory_bytes` needs to be set for write buffer manager to be
848    /// used.
849    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES: VarDefinition = VarDefinition::new(
850        "upsert_rocksdb_write_buffer_manager_memory_bytes",
851        value!(Option<usize>; None),
852        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
853        sources. Described in the `mz_rocksdb_types::config` module. \
854        Only takes effect on source restart (Materialize).",
855        false,
856    );
857
858    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL: VarDefinition = VarDefinition::new(
859        "upsert_rocksdb_write_buffer_manager_allow_stall",
860        value!(bool; false),
861        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
862        sources. Described in the `mz_rocksdb_types::config` module. \
863        Only takes effect on source restart (Materialize).",
864        false,
865    );
866}
867
868pub static LOGGING_FILTER: VarDefinition = VarDefinition::new_lazy(
869    "log_filter",
870    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
871    "Sets the filter to apply to stderr logging.",
872    false,
873);
874
875pub static OPENTELEMETRY_FILTER: VarDefinition = VarDefinition::new_lazy(
876    "opentelemetry_filter",
877    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
878    "Sets the filter to apply to OpenTelemetry-backed distributed tracing.",
879    false,
880);
881
882pub static LOGGING_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
883    "log_filter_defaults",
884    lazy_value!(Vec<SerializableDirective>; || {
885        mz_ore::tracing::LOGGING_DEFAULTS
886            .iter()
887            .map(|d| d.clone().into())
888            .collect()
889    }),
890    "Sets additional default directives to apply to stderr logging. \
891        These apply to all variations of `log_filter`. Directives other than \
892        `module=off` are likely incorrect.",
893    false,
894);
895
896pub static OPENTELEMETRY_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
897    "opentelemetry_filter_defaults",
898    lazy_value!(Vec<SerializableDirective>; || {
899        mz_ore::tracing::OPENTELEMETRY_DEFAULTS
900            .iter()
901            .map(|d| d.clone().into())
902            .collect()
903    }),
904    "Sets additional default directives to apply to OpenTelemetry-backed \
905        distributed tracing. \
906        These apply to all variations of `opentelemetry_filter`. Directives other than \
907        `module=off` are likely incorrect.",
908    false,
909);
910
911pub static SENTRY_FILTERS: VarDefinition = VarDefinition::new_lazy(
912    "sentry_filters",
913    lazy_value!(Vec<SerializableDirective>; || {
914        mz_ore::tracing::SENTRY_DEFAULTS
915            .iter()
916            .map(|d| d.clone().into())
917            .collect()
918    }),
919    "Sets additional default directives to apply to sentry logging. \
920        These apply on top of a default `info` directive. Directives other than \
921        `module=off` are likely incorrect.",
922    false,
923);
924
925pub static WEBHOOKS_SECRETS_CACHING_TTL_SECS: VarDefinition = VarDefinition::new_lazy(
926    "webhooks_secrets_caching_ttl_secs",
927    lazy_value!(usize; || {
928        usize::cast_from(mz_secrets::cache::DEFAULT_TTL_SECS)
929    }),
930    "Sets the time-to-live for values in the Webhooks secrets cache.",
931    false,
932);
933
934pub static COORD_SLOW_MESSAGE_WARN_THRESHOLD: VarDefinition = VarDefinition::new(
935    "coord_slow_message_warn_threshold",
936    value!(Duration; Duration::from_secs(30)),
937    "Sets the threshold at which we will error! for a coordinator message being slow.",
938    false,
939);
940
941/// Controls the connect_timeout setting when connecting to PG via `mz_postgres_util`.
942pub static PG_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
943    "pg_source_connect_timeout",
944    value!(Duration; DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
945    "Sets the timeout applied to socket-level connection attempts for PG \
946    replication connections (Materialize).",
947    false,
948);
949
950/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection
951/// when connecting to PG via `mz_postgres_util`.
952pub static PG_SOURCE_TCP_KEEPALIVES_RETRIES: VarDefinition = VarDefinition::new(
953    "pg_source_tcp_keepalives_retries",
954    value!(u32; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
955    "Sets the maximum number of TCP keepalive probes that will be sent before dropping \
956    a connection when connecting to PG via `mz_postgres_util` (Materialize).",
957    false,
958);
959
960/// Sets the amount of idle time before a keepalive packet is sent on the connection when connecting
961/// to PG via `mz_postgres_util`.
962pub static PG_SOURCE_TCP_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
963    "pg_source_tcp_keepalives_idle",
964    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
965    "Sets the amount of idle time before a keepalive packet is sent on the connection \
966        when connecting to PG via `mz_postgres_util` (Materialize).",
967    false,
968);
969
970/// Sets the time interval between TCP keepalive probes when connecting to PG via `mz_postgres_util`.
971pub static PG_SOURCE_TCP_KEEPALIVES_INTERVAL: VarDefinition = VarDefinition::new(
972    "pg_source_tcp_keepalives_interval",
973    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
974    "Sets the time interval between TCP keepalive probes when connecting to PG via \
975        replication (Materialize).",
976    false,
977);
978
979/// Sets the TCP user timeout when connecting to PG via `mz_postgres_util`.
980pub static PG_SOURCE_TCP_USER_TIMEOUT: VarDefinition = VarDefinition::new(
981    "pg_source_tcp_user_timeout",
982    value!(Duration; DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
983    "Sets the TCP user timeout when connecting to PG via `mz_postgres_util` (Materialize).",
984    false,
985);
986
987/// Sets whether to apply the TCP configuration parameters on the server when
988/// connecting to PG via `mz_postgres_util`.
989pub static PG_SOURCE_TCP_CONFIGURE_SERVER: VarDefinition = VarDefinition::new(
990    "pg_source_tcp_configure_server",
991    value!(bool; DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER),
992    "Sets whether to apply the TCP configuration parameters on the server when connecting to PG via `mz_postgres_util` (Materialize).",
993    false,
994);
995
996/// Sets the `statement_timeout` value to use during the snapshotting phase of
997/// PG sources.
998pub static PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
999    "pg_source_snapshot_statement_timeout",
1000    value!(Duration; mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT),
1001    "Sets the `statement_timeout` value to use during the snapshotting phase of PG sources (Materialize)",
1002    false,
1003);
1004
1005/// Sets the `wal_sender_timeout` value to use during the replication phase of
1006/// PG sources.
1007pub static PG_SOURCE_WAL_SENDER_TIMEOUT: VarDefinition = VarDefinition::new(
1008    "pg_source_wal_sender_timeout",
1009    value!(Option<Duration>; DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT),
1010    "Sets the `wal_sender_timeout` value to use during the replication phase of PG sources (Materialize)",
1011    false,
1012);
1013
1014/// Please see `PgSourceSnapshotConfig`.
1015pub static PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT: VarDefinition = VarDefinition::new(
1016    "pg_source_snapshot_collect_strict_count",
1017    value!(bool; mz_storage_types::parameters::PgSourceSnapshotConfig::new().collect_strict_count),
1018    "Please see <https://dev.materialize.com/api/rust-private\
1019        /mz_storage_types/parameters\
1020        /struct.PgSourceSnapshotConfig.html#structfield.collect_strict_count>",
1021    false,
1022);
1023
1024/// Sets the time between TCP keepalive probes when connecting to MySQL via `mz_mysql_util`.
1025pub static MYSQL_SOURCE_TCP_KEEPALIVE: VarDefinition = VarDefinition::new(
1026    "mysql_source_tcp_keepalive",
1027    value!(Duration; mz_mysql_util::DEFAULT_TCP_KEEPALIVE),
1028    "Sets the time between TCP keepalive probes when connecting to MySQL",
1029    false,
1030);
1031
1032/// Sets the `max_execution_time` value to use during the snapshotting phase of
1033/// MySQL sources.
1034pub static MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME: VarDefinition = VarDefinition::new(
1035    "mysql_source_snapshot_max_execution_time",
1036    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_MAX_EXECUTION_TIME),
1037    "Sets the `max_execution_time` value to use during the snapshotting phase of MySQL sources (Materialize)",
1038    false,
1039);
1040
1041/// Sets the `lock_wait_timeout` value to use during the snapshotting phase of
1042/// MySQL sources.
1043pub static MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT: VarDefinition = VarDefinition::new(
1044    "mysql_source_snapshot_lock_wait_timeout",
1045    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_LOCK_WAIT_TIMEOUT),
1046    "Sets the `lock_wait_timeout` value to use during the snapshotting phase of MySQL sources (Materialize)",
1047    false,
1048);
1049
1050/// Sets the timeout for establishing an authenticated connection to MySQL
1051pub static MYSQL_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1052    "mysql_source_connect_timeout",
1053    value!(Duration; mz_mysql_util::DEFAULT_CONNECT_TIMEOUT),
1054    "Sets the timeout for establishing an authenticated connection to MySQL",
1055    false,
1056);
1057
1058/// Controls the check interval for connections to SSH bastions via `mz_ssh_util`.
1059pub static SSH_CHECK_INTERVAL: VarDefinition = VarDefinition::new(
1060    "ssh_check_interval",
1061    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CHECK_INTERVAL),
1062    "Controls the check interval for connections to SSH bastions via `mz_ssh_util`.",
1063    false,
1064);
1065
1066/// Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.
1067pub static SSH_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1068    "ssh_connect_timeout",
1069    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CONNECT_TIMEOUT),
1070    "Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.",
1071    false,
1072);
1073
1074/// Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.
1075pub static SSH_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
1076    "ssh_keepalives_idle",
1077    value!(Duration; mz_ssh_util::tunnel::DEFAULT_KEEPALIVES_IDLE),
1078    "Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.",
1079    false,
1080);
1081
1082/// Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.
1083pub static KAFKA_SOCKET_KEEPALIVE: VarDefinition = VarDefinition::new(
1084    "kafka_socket_keepalive",
1085    value!(bool; mz_kafka_util::client::DEFAULT_KEEPALIVE),
1086    "Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.",
1087    false,
1088);
1089
1090/// Controls `socket.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1091/// (60000ms). Cannot be greater than 300000ms, more than 100ms greater than
1092/// `kafka_transaction_timeout`, or less than 10ms.
1093pub static KAFKA_SOCKET_TIMEOUT: VarDefinition = VarDefinition::new(
1094    "kafka_socket_timeout",
1095    value!(Option<Duration>; None),
1096    "Controls `socket.timeout.ms` for rdkafka \
1097        client connections. Defaults to the rdkafka default (60000ms) or \
1098        the set transaction timeout + 100ms, whichever one is smaller. \
1099        Cannot be greater than 300000ms, more than 100ms greater than \
1100        `kafka_transaction_timeout`, or less than 10ms.",
1101    false,
1102);
1103
1104/// Controls `transaction.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1105/// (60000ms). Cannot be greater than `i32::MAX` or less than 1000ms.
1106pub static KAFKA_TRANSACTION_TIMEOUT: VarDefinition = VarDefinition::new(
1107    "kafka_transaction_timeout",
1108    value!(Duration; mz_kafka_util::client::DEFAULT_TRANSACTION_TIMEOUT),
1109    "Controls `transaction.timeout.ms` for rdkafka \
1110        client connections. Defaults to the 10min. \
1111        Cannot be greater than `i32::MAX` or less than 1000ms.",
1112    false,
1113);
1114
1115/// Controls `socket.connection.setup.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1116/// (30000ms). Cannot be greater than `i32::MAX` or less than 1000ms
1117pub static KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT: VarDefinition = VarDefinition::new(
1118    "kafka_socket_connection_setup_timeout",
1119    value!(Duration; mz_kafka_util::client::DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT),
1120    "Controls `socket.connection.setup.timeout.ms` for rdkafka \
1121        client connections. Defaults to the rdkafka default (30000ms). \
1122        Cannot be greater than `i32::MAX` or less than 1000ms",
1123    false,
1124);
1125
1126/// Controls the timeout when fetching kafka metadata. Defaults to 10s.
1127pub static KAFKA_FETCH_METADATA_TIMEOUT: VarDefinition = VarDefinition::new(
1128    "kafka_fetch_metadata_timeout",
1129    value!(Duration; mz_kafka_util::client::DEFAULT_FETCH_METADATA_TIMEOUT),
1130    "Controls the timeout when fetching kafka metadata. \
1131        Defaults to 10s.",
1132    false,
1133);
1134
1135/// Controls the timeout when fetching kafka progress records. Defaults to 60s.
1136pub static KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT: VarDefinition = VarDefinition::new(
1137    "kafka_progress_record_fetch_timeout",
1138    value!(Option<Duration>; None),
1139    "Controls the timeout when fetching kafka progress records. \
1140        Defaults to 60s or the transaction timeout, whichever one is larger.",
1141    false,
1142);
1143
1144/// The maximum number of in-flight bytes emitted by persist_sources feeding _storage
1145/// dataflows_.
1146/// Currently defaults to 256MiB = 268435456 bytes
1147/// Note: Backpressure will only be turned on if disk is enabled based on
1148/// `storage_dataflow_max_inflight_bytes_disk_only` flag
1149pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES: VarDefinition = VarDefinition::new(
1150    "storage_dataflow_max_inflight_bytes",
1151    value!(Option<usize>; Some(256 * 1024 * 1024)),
1152    "The maximum number of in-flight bytes emitted by persist_sources feeding \
1153        storage dataflows. Defaults to backpressure enabled (Materialize).",
1154    false,
1155);
1156
1157/// Configuration ratio to shrink unusef buffers in upsert by.
1158/// For eg: is 2 is set, then the buffers will be reduced by 2 i.e. halved.
1159/// Default is 0, which means shrinking is disabled.
1160pub static STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
1161    "storage_shrink_upsert_unused_buffers_by_ratio",
1162    value!(usize; 0),
1163    "Configuration ratio to shrink unusef buffers in upsert by",
1164    false,
1165);
1166
1167/// The fraction of the cluster replica size to be used as the maximum number of
1168/// in-flight bytes emitted by persist_sources feeding storage dataflows.
1169/// If not configured, the storage_dataflow_max_inflight_bytes value will be used.
1170/// For this value to be used storage_dataflow_max_inflight_bytes needs to be set.
1171pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION: VarDefinition =
1172    VarDefinition::new_lazy(
1173        "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction",
1174        lazy_value!(Option<Numeric>; || Some(0.01.into())),
1175        "The fraction of the cluster replica size to be used as the maximum number of \
1176            in-flight bytes emitted by persist_sources feeding storage dataflows. \
1177            If not configured, the storage_dataflow_max_inflight_bytes value will be used.",
1178        false,
1179    );
1180
1181pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY: VarDefinition = VarDefinition::new(
1182    "storage_dataflow_max_inflight_bytes_disk_only",
1183    value!(bool; true),
1184    "Whether or not `storage_dataflow_max_inflight_bytes` applies only to \
1185        upsert dataflows using disks. Defaults to true (Materialize).",
1186    false,
1187);
1188
1189/// The interval to submit statistics to `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker`.
1190pub static STORAGE_STATISTICS_INTERVAL: VarDefinition = VarDefinition::new(
1191    "storage_statistics_interval",
1192    value!(Duration; mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT),
1193    "The interval to submit statistics to `mz_source_statistics_per_worker` \
1194        and `mz_sink_statistics` (Materialize).",
1195    false,
1196);
1197
1198/// The interval to collect statistics for `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker` in
1199/// clusterd. Controls the accuracy of metrics.
1200pub static STORAGE_STATISTICS_COLLECTION_INTERVAL: VarDefinition = VarDefinition::new(
1201    "storage_statistics_collection_interval",
1202    value!(Duration; mz_storage_types::parameters::STATISTICS_COLLECTION_INTERVAL_DEFAULT),
1203    "The interval to collect statistics for `mz_source_statistics_per_worker` \
1204        and `mz_sink_statistics_per_worker` in clusterd. Controls the accuracy of metrics \
1205        (Materialize).",
1206    false,
1207);
1208
1209pub static STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: VarDefinition = VarDefinition::new(
1210    "storage_record_source_sink_namespaced_errors",
1211    value!(bool; true),
1212    "Whether or not to record namespaced errors in the status history tables",
1213    false,
1214);
1215
1216/// Boolean flag indicating whether to enable syncing from
1217/// LaunchDarkly. Can be turned off as an emergency measure to still
1218/// be able to alter parameters while LD is broken.
1219pub static ENABLE_LAUNCHDARKLY: VarDefinition = VarDefinition::new(
1220    "enable_launchdarkly",
1221    value!(bool; true),
1222    "Boolean flag indicating whether flag synchronization from LaunchDarkly should be enabled (Materialize).",
1223    false,
1224);
1225
1226/// Feature flag indicating whether real time recency is enabled. Not that
1227/// unlike other feature flags, this is made available at the session level, so
1228/// is additionally gated by a feature flag.
1229pub static REAL_TIME_RECENCY: VarDefinition = VarDefinition::new(
1230    "real_time_recency",
1231    value!(bool; false),
1232    "Feature flag indicating whether real time recency is enabled (Materialize).",
1233    true,
1234)
1235.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1236
1237pub static REAL_TIME_RECENCY_TIMEOUT: VarDefinition = VarDefinition::new(
1238    "real_time_recency_timeout",
1239    value!(Duration; Duration::from_secs(10)),
1240    "Sets the maximum allowed duration of SELECTs that actively use real-time \
1241    recency, i.e. reach out to an external system to determine their most recencly exposed \
1242    data (Materialize).",
1243    true,
1244)
1245.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1246
1247pub static EMIT_PLAN_INSIGHTS_NOTICE: VarDefinition = VarDefinition::new(
1248    "emit_plan_insights_notice",
1249    value!(bool; false),
1250    "Boolean flag indicating whether to send a NOTICE with JSON-formatted plan insights before executing a SELECT statement (Materialize).",
1251    true,
1252);
1253
1254pub static EMIT_TIMESTAMP_NOTICE: VarDefinition = VarDefinition::new(
1255    "emit_timestamp_notice",
1256    value!(bool; false),
1257    "Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize).",
1258    true,
1259);
1260
1261pub static EMIT_TRACE_ID_NOTICE: VarDefinition = VarDefinition::new(
1262    "emit_trace_id_notice",
1263    value!(bool; false),
1264    "Boolean flag indicating whether to send a NOTICE specifying the trace id when available (Materialize).",
1265    true,
1266);
1267
1268pub static UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP: VarDefinition = VarDefinition::new(
1269    "unsafe_mock_audit_event_timestamp",
1270    value!(Option<mz_repr::Timestamp>; None),
1271    "Mocked timestamp to use for audit events for testing purposes",
1272    false,
1273);
1274
1275pub static ENABLE_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1276    "enable_rbac_checks",
1277    value!(bool; true),
1278    "User facing global boolean flag indicating whether to apply RBAC checks before \
1279        executing statements (Materialize).",
1280    true,
1281);
1282
1283pub static ENABLE_SESSION_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1284    "enable_session_rbac_checks",
1285    // TODO(jkosh44) Once RBAC is enabled in all environments, change this to `true`.
1286    value!(bool; false),
1287    "User facing session boolean flag indicating whether to apply RBAC checks before \
1288        executing statements (Materialize).",
1289    true,
1290);
1291
1292pub static RESTRICT_TO_USER_OBJECTS: VarDefinition = VarDefinition::new(
1293    "restrict_to_user_objects",
1294    value!(bool; false),
1295    "When enabled, queries are restricted from accessing system catalog objects. \
1296        Useful for MCP tool queries that should only access user-created data products.",
1297    true,
1298);
1299
1300pub static EMIT_INTROSPECTION_QUERY_NOTICE: VarDefinition = VarDefinition::new(
1301    "emit_introspection_query_notice",
1302    value!(bool; true),
1303    "Whether to print a notice when querying per-replica introspection sources.",
1304    true,
1305);
1306
1307// TODO(mgree) change this to a SelectOption
1308pub static ENABLE_SESSION_CARDINALITY_ESTIMATES: VarDefinition = VarDefinition::new(
1309    "enable_session_cardinality_estimates",
1310    value!(bool; false),
1311    "Feature flag indicating whether to use cardinality estimates when optimizing queries; \
1312        does not affect EXPLAIN WITH(cardinality) (Materialize).",
1313    true,
1314)
1315.with_feature_flag(&ENABLE_CARDINALITY_ESTIMATES);
1316
1317pub static OPTIMIZER_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1318    "optimizer_stats_timeout",
1319    value!(Duration; Duration::from_millis(250)),
1320    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1321        applied to non-oneshot, i.e., long-lasting queries, like CREATE MATERIALIZED VIEW (Materialize).",
1322    false,
1323);
1324
1325pub static OPTIMIZER_ONESHOT_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1326    "optimizer_oneshot_stats_timeout",
1327    value!(Duration; Duration::from_millis(10)),
1328    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1329        applied to oneshot queries, like SELECT (Materialize).",
1330    false,
1331);
1332
1333pub static PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE: VarDefinition = VarDefinition::new(
1334    "privatelink_status_update_quota_per_minute",
1335    value!(u32; 20),
1336    "Sets the per-minute quota for privatelink vpc status updates to be written to \
1337        the storage-collection-backed system table. This value implies the total and burst quota per-minute.",
1338    false,
1339);
1340
1341pub static STATEMENT_LOGGING_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1342    "statement_logging_sample_rate",
1343    lazy_value!(Numeric; || 0.1.into()),
1344    "User-facing session variable indicating how many statement executions should be \
1345        logged, subject to constraint by the system variable `statement_logging_max_sample_rate` (Materialize).",
1346    true,
1347).with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1348
1349pub static ENABLE_DEFAULT_CONNECTION_VALIDATION: VarDefinition = VarDefinition::new(
1350    "enable_default_connection_validation",
1351    value!(bool; true),
1352    "LD facing global boolean flag that allows turning default connection validation off for everyone (Materialize).",
1353    false,
1354);
1355
1356pub static STATEMENT_LOGGING_MAX_DATA_CREDIT: VarDefinition = VarDefinition::new(
1357    "statement_logging_max_data_credit",
1358    value!(Option<usize>; Some(50 * 1024 * 1024)),
1359    // The idea is that during periods of low logging, tokens can accumulate up to this value,
1360    // and then be depleted during periods of high logging.
1361    "The maximum number of bytes that can be logged for statement logging in short burts, or NULL if unlimited (Materialize).",
1362    false,
1363);
1364
1365pub static STATEMENT_LOGGING_TARGET_DATA_RATE: VarDefinition = VarDefinition::new(
1366    "statement_logging_target_data_rate",
1367    value!(Option<usize>; Some(2071)),
1368    "The maximum sustained data rate of statement logging, in bytes per second, or NULL if unlimited (Materialize).",
1369    false,
1370);
1371
1372pub static STATEMENT_LOGGING_MAX_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1373    "statement_logging_max_sample_rate",
1374    lazy_value!(Numeric; || 0.99.into()),
1375    "The maximum rate at which statements may be logged. If this value is less than \
1376        that of `statement_logging_sample_rate`, the latter is ignored (Materialize).",
1377    true,
1378)
1379.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1380
1381pub static STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1382    "statement_logging_default_sample_rate",
1383    lazy_value!(Numeric; || 0.99.into()),
1384    "The default value of `statement_logging_sample_rate` for new sessions (Materialize).",
1385    true,
1386)
1387.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1388
1389pub static ENABLE_INTERNAL_STATEMENT_LOGGING: VarDefinition = VarDefinition::new(
1390    "enable_internal_statement_logging",
1391    value!(bool; false),
1392    "Whether to log statements from the `mz_system` user.",
1393    false,
1394);
1395
1396pub static AUTO_ROUTE_CATALOG_QUERIES: VarDefinition = VarDefinition::new(
1397    "auto_route_catalog_queries",
1398    value!(bool; true),
1399    "Whether to force queries that depend only on system tables, to run on the mz_catalog_server cluster (Materialize).",
1400    true,
1401);
1402
1403pub static MAX_CONNECTIONS: VarDefinition = VarDefinition::new(
1404    "max_connections",
1405    value!(u32; 5000),
1406    "The maximum number of concurrent connections (PostgreSQL).",
1407    true,
1408);
1409
1410pub static SUPERUSER_RESERVED_CONNECTIONS: VarDefinition = VarDefinition::new(
1411    "superuser_reserved_connections",
1412    value!(u32; 3),
1413    "The number of connections that are reserved for superusers (PostgreSQL).",
1414    true,
1415);
1416
1417/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_source_status_history_entries`].
1418pub static KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1419    "keep_n_source_status_history_entries",
1420    value!(usize; 5),
1421    "On reboot, truncate all but the last n entries per ID in the source_status_history collection (Materialize).",
1422    false,
1423);
1424
1425/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_sink_status_history_entries`].
1426pub static KEEP_N_SINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1427    "keep_n_sink_status_history_entries",
1428    value!(usize; 5),
1429    "On reboot, truncate all but the last n entries per ID in the sink_status_history collection (Materialize).",
1430    false,
1431);
1432
1433/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_privatelink_status_history_entries`].
1434pub static KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1435    "keep_n_privatelink_status_history_entries",
1436    value!(usize; 5),
1437    "On reboot, truncate all but the last n entries per ID in the mz_aws_privatelink_connection_status_history \
1438        collection (Materialize).",
1439    false,
1440);
1441
1442/// Controls [`mz_storage_types::parameters::StorageParameters::replica_status_history_retention_window`].
1443pub static REPLICA_STATUS_HISTORY_RETENTION_WINDOW: VarDefinition = VarDefinition::new(
1444    "replica_status_history_retention_window",
1445    value!(Duration; REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT),
1446    "On reboot, truncate up all entries past the retention window in the mz_cluster_replica_status_history \
1447        collection (Materialize).",
1448    false,
1449);
1450
1451pub static ENABLE_STORAGE_SHARD_FINALIZATION: VarDefinition = VarDefinition::new(
1452    "enable_storage_shard_finalization",
1453    value!(bool; true),
1454    "Whether to allow the storage client to finalize shards (Materialize).",
1455    false,
1456);
1457
1458pub static DEFAULT_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1459    "default_timestamp_interval",
1460    value!(Duration; Duration::from_millis(1000)),
1461    "The interval at which timestamps are assigned to data from sources and tables.",
1462    false,
1463)
1464.with_constraint(&NON_ZERO_DURATION);
1465
1466pub static MIN_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1467    "min_timestamp_interval",
1468    value!(Duration; Duration::from_millis(1000)),
1469    "Minimum timestamp interval",
1470    false,
1471);
1472
1473pub static MAX_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1474    "max_timestamp_interval",
1475    value!(Duration; Duration::from_millis(1000)),
1476    "Maximum timestamp interval",
1477    false,
1478);
1479
1480pub static WEBHOOK_CONCURRENT_REQUEST_LIMIT: VarDefinition = VarDefinition::new(
1481    "webhook_concurrent_request_limit",
1482    value!(usize; WEBHOOK_CONCURRENCY_LIMIT),
1483    "Maximum number of concurrent requests for appending to a webhook source.",
1484    false,
1485);
1486
1487pub static USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION: VarDefinition = VarDefinition::new(
1488    "user_storage_managed_collections_batch_duration",
1489    value!(Duration; STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT),
1490    "Duration which we'll wait to collect a batch of events for a webhook source.",
1491    false,
1492);
1493
1494// This system var will need to point to the name of an existing network policy
1495// this will be enforced on alter_system_set
1496pub static NETWORK_POLICY: VarDefinition = VarDefinition::new_lazy(
1497    "network_policy",
1498    lazy_value!(String; || "default".to_string()),
1499    "Sets the fallback network policy applied to all users without an explicit policy.",
1500    true,
1501);
1502
1503pub static FORCE_SOURCE_TABLE_SYNTAX: VarDefinition = VarDefinition::new(
1504    "force_source_table_syntax",
1505    value!(bool; false),
1506    "Force use of new source model (CREATE TABLE .. FROM SOURCE) and migrate existing sources",
1507    true,
1508);
1509
1510pub static OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD: VarDefinition = VarDefinition::new(
1511    "optimizer_e2e_latency_warning_threshold",
1512    value!(Duration; Duration::from_millis(500)),
1513    "Sets the duration that a query can take to compile; queries that take longer \
1514        will trigger a warning. If this value is specified without units, it is taken as \
1515        milliseconds. A value of zero disables the timeout (Materialize).",
1516    true,
1517);
1518
1519/// Configuration for gRPC client connections.
1520pub mod grpc_client {
1521    use super::*;
1522
1523    pub static CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1524        "grpc_client_connect_timeout",
1525        value!(Duration; Duration::from_secs(5)),
1526        "Timeout to apply to initial gRPC client connection establishment.",
1527        false,
1528    );
1529
1530    pub static HTTP2_KEEP_ALIVE_INTERVAL: VarDefinition = VarDefinition::new(
1531        "grpc_client_http2_keep_alive_interval",
1532        value!(Duration; Duration::from_secs(3)),
1533        "Idle time to wait before sending HTTP/2 PINGs to maintain established gRPC client connections.",
1534        false,
1535    );
1536
1537    pub static HTTP2_KEEP_ALIVE_TIMEOUT: VarDefinition = VarDefinition::new(
1538        "grpc_client_http2_keep_alive_timeout",
1539        value!(Duration; Duration::from_secs(60)),
1540        "Time to wait for HTTP/2 pong response before terminating a gRPC client connection.",
1541        false,
1542    );
1543}
1544
1545/// Configuration for how cluster replicas are scheduled.
1546pub mod cluster_scheduling {
1547    use super::*;
1548    use mz_orchestrator::scheduling_config::*;
1549
1550    pub static CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1551        "cluster_multi_process_replica_az_affinity_weight",
1552        value!(Option<i32>; DEFAULT_POD_AZ_AFFINITY_WEIGHT),
1553        "Whether or not to add an availability zone affinity between instances of \
1554            multi-process replicas. Either an affinity weight or empty (off) (Materialize).",
1555        false,
1556    );
1557
1558    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY: VarDefinition = VarDefinition::new(
1559        "cluster_soften_replication_anti_affinity",
1560        value!(bool; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY),
1561        "Whether or not to turn the node-scope anti affinity between replicas \
1562            in the same cluster into a preference (Materialize).",
1563        false,
1564    );
1565
1566    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1567        "cluster_soften_replication_anti_affinity_weight",
1568        value!(i32; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT),
1569        "The preference weight for `cluster_soften_replication_anti_affinity` (Materialize).",
1570        false,
1571    );
1572
1573    pub static CLUSTER_ENABLE_TOPOLOGY_SPREAD: VarDefinition = VarDefinition::new(
1574        "cluster_enable_topology_spread",
1575        value!(bool; DEFAULT_TOPOLOGY_SPREAD_ENABLED),
1576        "Whether or not to add topology spread constraints among replicas in the same cluster (Materialize).",
1577        false,
1578    );
1579
1580    pub static CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: VarDefinition =
1581        VarDefinition::new(
1582            "cluster_topology_spread_ignore_non_singular_scale",
1583            value!(bool; DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE),
1584            "If true, ignore replicas with more than 1 process when adding topology spread constraints (Materialize).",
1585            false,
1586        );
1587
1588    pub static CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW: VarDefinition = VarDefinition::new(
1589        "cluster_topology_spread_max_skew",
1590        value!(i32; DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW),
1591        "The `maxSkew` for replica topology spread constraints (Materialize).",
1592        false,
1593    );
1594
1595    // `minDomains`, like maxSkew, is used to spread across a topology
1596    // key. Unlike max skew, minDomains will force node creation to ensure
1597    // distribution across a minimum number of keys.
1598    // https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#spread-constraint-definition
1599    pub static CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS: VarDefinition = VarDefinition::new(
1600        "cluster_topology_spread_min_domains",
1601        value!(Option<i32>; None),
1602        "`minDomains` for replica topology spread constraints. \
1603            Should be set to the number of Availability Zones (Materialize).",
1604        false,
1605    );
1606
1607    pub static CLUSTER_TOPOLOGY_SPREAD_SOFT: VarDefinition = VarDefinition::new(
1608        "cluster_topology_spread_soft",
1609        value!(bool; DEFAULT_TOPOLOGY_SPREAD_SOFT),
1610        "If true, soften the topology spread constraints for replicas (Materialize).",
1611        false,
1612    );
1613
1614    pub static CLUSTER_SOFTEN_AZ_AFFINITY: VarDefinition = VarDefinition::new(
1615        "cluster_soften_az_affinity",
1616        value!(bool; DEFAULT_SOFTEN_AZ_AFFINITY),
1617        "Whether or not to turn the az-scope node affinity for replicas. \
1618            Note this could violate requests from the user (Materialize).",
1619        false,
1620    );
1621
1622    pub static CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1623        "cluster_soften_az_affinity_weight",
1624        value!(i32; DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT),
1625        "The preference weight for `cluster_soften_az_affinity` (Materialize).",
1626        false,
1627    );
1628
1629    const DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL: Duration = Duration::from_secs(3);
1630
1631    pub static CLUSTER_ALTER_CHECK_READY_INTERVAL: VarDefinition = VarDefinition::new(
1632        "cluster_alter_check_ready_interval",
1633        value!(Duration; DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL),
1634        "How often to poll readiness checks for cluster alter",
1635        false,
1636    );
1637
1638    const DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL: Duration = Duration::from_secs(3);
1639
1640    pub static CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL: VarDefinition = VarDefinition::new(
1641        "cluster_check_scheduling_policies_interval",
1642        value!(Duration; DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL),
1643        "How often policies are invoked to automatically start/stop clusters, e.g., \
1644            for REFRESH EVERY materialized views.",
1645        false,
1646    )
1647    .with_constraint(&NON_ZERO_DURATION);
1648
1649    pub static CLUSTER_SECURITY_CONTEXT_ENABLED: VarDefinition = VarDefinition::new(
1650        "cluster_security_context_enabled",
1651        value!(bool; DEFAULT_SECURITY_CONTEXT_ENABLED),
1652        "Enables SecurityContext for clusterd instances, restricting capabilities to improve security.",
1653        false,
1654    );
1655
1656    const DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: Duration = Duration::from_secs(1200);
1657
1658    pub static CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: VarDefinition = VarDefinition::new(
1659        "cluster_refresh_mv_compaction_estimate",
1660        value!(Duration; DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE),
1661        "How much time to wait for compaction after a REFRESH MV completes a refresh \
1662            before turning off the refresh cluster. This is needed because Persist does compaction \
1663            only after a write, but refresh MVs do writes only at their refresh times. \
1664            (In the long term, we'd like to remove this configuration and instead wait exactly \
1665            until compaction has settled. We'd need some new Persist API for this.)",
1666        false,
1667    );
1668}
1669
1670/// Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the
1671/// availability of features.
1672///
1673/// The arguments to `feature_flags!` are:
1674/// - `$name`, which will be the name of the feature flag, in snake_case
1675/// - `$feature_desc`, a human-readable description of the feature
1676/// - `$value`, which if not provided, defaults to `false`
1677///
1678/// Note that not all `VarDefinition<bool>` are feature flags. Feature flags are for variables that:
1679/// - Belong to `SystemVars`, _not_ `SessionVars`
1680/// - Default to false and must be explicitly enabled, or default to `true` and can be explicitly disabled.
1681///
1682/// WARNING / CONTRACT: Syntax-related feature flags must always *enable* behavior. In other words,
1683/// setting a feature flag must make the system more permissive. For example, let's suppose we'd like
1684/// to gate deprecated upsert syntax behind a feature flag. In this case, do not add a feature flag
1685/// like `disable_deprecated_upsert_syntax`, as `disable_deprecated_upsert_syntax = on` would
1686/// _prevent_ the system from parsing the deprecated upsert syntax. Instead, use a feature flag
1687/// like `enable_deprecated_upsert_syntax`.
1688///
1689/// The hazard this protects against is related to reboots after feature flags have been disabled.
1690/// Say someone creates a Kinesis source while `enable_kinesis_sources = on`. Materialize will
1691/// commit this source to the system catalog. Then, suppose we discover a catastrophic bug in
1692/// Kinesis sources and set `enable_kinesis_sources` to `off`. This prevents users from creating
1693/// new Kinesis sources, but leaves the existing Kinesis sources in place. This is because
1694/// disabling a feature flag doesn't remove access to catalog objects created while the feature
1695/// flag was live. On the next reboot, Materialize will proceed to load the Kinesis source from the
1696/// catalog, reparsing and replanning the `CREATE SOURCE` definition and rechecking the
1697/// `enable_kinesis_sources` feature flag along the way. Even though the feature flag has been
1698/// switched to `off`, we need to temporarily re-enable it during parsing and planning to be able
1699/// to boot successfully.
1700///
1701/// Ensuring that all syntax-related feature flags *enable* behavior means that setting all such
1702/// feature flags to `on` during catalog boot has the desired effect.
1703macro_rules! feature_flags {
1704    // Resolve an optional `scope:` field to a `ParameterScope`, using the
1705    // default scope when the field is omitted.
1706    (@scope_or_default) => {
1707        ParameterScope::DEFAULT
1708    };
1709    (@scope_or_default $scope:expr) => {
1710        $scope
1711    };
1712    // Match `$name, $feature_desc, $value`.
1713    (@inner
1714        // The feature flag name.
1715        name: $name:expr,
1716        // The feature flag description.
1717        desc: $desc:literal,
1718        // The feature flag default value.
1719        default: $value:expr,
1720        // The scope class of the feature flag.
1721        scope: $scope:expr,
1722    ) => {
1723        paste::paste!{
1724            // Note that the ServerVar is not directly exported; we expect these to be
1725            // accessible through their FeatureFlag variant.
1726            static [<$name:upper _VAR>]: VarDefinition = VarDefinition::new(
1727                stringify!($name),
1728                value!(bool; $value),
1729                concat!("Whether ", $desc, " is allowed (Materialize)."),
1730                false,
1731            )
1732            .scoped($scope);
1733
1734            pub static [<$name:upper >]: FeatureFlag = FeatureFlag {
1735                flag: &[<$name:upper _VAR>],
1736                feature_desc: $desc,
1737            };
1738        }
1739    };
1740    ($({
1741        // The feature flag name.
1742        name: $name:expr,
1743        // The feature flag description.
1744        desc: $desc:literal,
1745        // The feature flag default value.
1746        default: $value:expr,
1747        // Should the feature be turned on during catalog rehydration when
1748        // parsing a catalog item.
1749        enable_for_item_parsing: $enable_for_item_parsing:expr,
1750        // The optional scope class. Uses `ParameterScope::DEFAULT` when omitted.
1751        // Cluster-coherent optimizer flags declare `scope: ParameterScope::Cluster`.
1752        $(scope: $scope:expr,)?
1753    },)+) => {
1754        $(feature_flags! { @inner
1755            name: $name,
1756            desc: $desc,
1757            default: $value,
1758            scope: feature_flags!(@scope_or_default $($scope)?),
1759        })+
1760
1761        paste::paste!{
1762            pub static FEATURE_FLAGS: &'static [&'static VarDefinition] = &[
1763                $(  & [<$name:upper _VAR>] , )+
1764            ];
1765        }
1766
1767        paste::paste!{
1768            impl super::SystemVars {
1769                pub fn enable_all_feature_flags_by_default(&mut self) {
1770                    $(
1771                        self.set_default(stringify!($name), super::VarInput::Flat("on"))
1772                            .expect("setting default value must work");
1773                    )+
1774                }
1775
1776                pub fn enable_for_item_parsing(&mut self) {
1777                    $(
1778                        if $enable_for_item_parsing {
1779                            self.set(stringify!($name), super::VarInput::Flat("on"))
1780                                .expect("setting default value must work");
1781                        }
1782                    )+
1783                }
1784
1785                $(
1786                    pub fn [<$name:lower>](&self) -> bool {
1787                        *self.expect_value(&[<$name:upper _VAR>])
1788                    }
1789                )+
1790            }
1791        }
1792    }
1793}
1794
1795feature_flags!(
1796    // Gates for other feature flags
1797    {
1798        name: allow_real_time_recency,
1799        desc: "real time recency",
1800        default: false,
1801        enable_for_item_parsing: true,
1802    },
1803    // Actual feature flags
1804    {
1805        name: enable_binary_date_bin,
1806        desc: "the binary version of date_bin function",
1807        default: false,
1808        enable_for_item_parsing: true,
1809    },
1810    {
1811        name: enable_date_bin_hopping,
1812        desc: "the date_bin_hopping function",
1813        default: false,
1814        enable_for_item_parsing: true,
1815    },
1816    {
1817        name: enable_envelope_debezium_in_subscribe,
1818        desc: "`ENVELOPE DEBEZIUM (KEY (..))`",
1819        default: false,
1820        enable_for_item_parsing: true,
1821    },
1822    {
1823        name: enable_envelope_materialize,
1824        desc: "ENVELOPE MATERIALIZE",
1825        default: false,
1826        enable_for_item_parsing: true,
1827    },
1828    {
1829        name: enable_explain_pushdown,
1830        desc: "EXPLAIN FILTER PUSHDOWN",
1831        default: true,
1832        enable_for_item_parsing: true,
1833    },
1834    {
1835        name: enable_index_options,
1836        desc: "INDEX OPTIONS",
1837        default: false,
1838        enable_for_item_parsing: true,
1839    },
1840    {
1841        name: enable_list_length_max,
1842        desc: "the list_length_max function",
1843        default: false,
1844        enable_for_item_parsing: true,
1845    },
1846    {
1847        name: enable_list_n_layers,
1848        desc: "the list_n_layers function",
1849        default: false,
1850        enable_for_item_parsing: true,
1851    },
1852    {
1853        name: enable_list_remove,
1854        desc: "the list_remove function",
1855        default: false,
1856        enable_for_item_parsing: true,
1857    },
1858    {
1859
1860        name: enable_logical_compaction_window,
1861        desc: "RETAIN HISTORY",
1862        default: false,
1863        enable_for_item_parsing: true,
1864    },
1865    {
1866        name: enable_primary_key_not_enforced,
1867        desc: "PRIMARY KEY NOT ENFORCED",
1868        default: false,
1869        enable_for_item_parsing: true,
1870    },
1871    {
1872        name: enable_collection_partition_by,
1873        desc: "PARTITION BY",
1874        default: true,
1875        enable_for_item_parsing: true,
1876    },
1877    {
1878        name: enable_multi_worker_storage_persist_sink,
1879        desc: "multi-worker storage persist sink",
1880        default: true,
1881        enable_for_item_parsing: true,
1882    },
1883    {
1884        name: enable_persist_streaming_snapshot_and_fetch,
1885        desc: "use the new streaming consolidate for snapshot_and_fetch",
1886        default: false,
1887        enable_for_item_parsing: true,
1888    },
1889    {
1890        name: enable_persist_streaming_compaction,
1891        desc: "use the new streaming consolidate for compaction",
1892        default: false,
1893        enable_for_item_parsing: true,
1894    },
1895    {
1896        name: enable_raise_statement,
1897        desc: "RAISE statement",
1898        default: false,
1899        enable_for_item_parsing: true,
1900    },
1901    {
1902        name: enable_repeat_row,
1903        desc: "the repeat_row function",
1904        default: false,
1905        enable_for_item_parsing: true,
1906    },
1907    {
1908        name: enable_repeat_row_non_negative,
1909        desc: "the repeat_row_non_negative function",
1910        default: false,
1911        enable_for_item_parsing: true,
1912    },
1913    {
1914        name: enable_replica_targeted_materialized_views,
1915        desc: "replica-targeted materialized views",
1916        default: false,
1917        enable_for_item_parsing: true,
1918    },
1919    {
1920        name: unsafe_enable_table_check_constraint,
1921        desc: "CREATE TABLE with a check constraint",
1922        default: false,
1923        enable_for_item_parsing: true,
1924    },
1925    {
1926        name: unsafe_enable_table_foreign_key,
1927        desc: "CREATE TABLE with a foreign key",
1928        default: false,
1929        enable_for_item_parsing: true,
1930    },
1931    {
1932        name: unsafe_enable_table_keys,
1933        desc: "CREATE TABLE with a primary key or unique constraint",
1934        default: false,
1935        enable_for_item_parsing: true,
1936    },
1937    {
1938        name: unsafe_enable_unorchestrated_cluster_replicas,
1939        desc: "unorchestrated cluster replicas",
1940        default: false,
1941        enable_for_item_parsing: true,
1942    },
1943    {
1944        name: unsafe_enable_unstable_dependencies,
1945        desc: "depending on unstable objects",
1946        default: false,
1947        enable_for_item_parsing: true,
1948    },
1949    {
1950        name: enable_within_timestamp_order_by_in_subscribe,
1951        desc: "`WITHIN TIMESTAMP ORDER BY ..`",
1952        default: false,
1953        enable_for_item_parsing: true,
1954    },
1955    {
1956        name: enable_cardinality_estimates,
1957        desc: "join planning with cardinality estimates",
1958        default: false,
1959        enable_for_item_parsing: false,
1960    },
1961    {
1962        name: enable_connection_validation_syntax,
1963        desc: "CREATE CONNECTION .. WITH (VALIDATE) and VALIDATE CONNECTION syntax",
1964        default: true,
1965        enable_for_item_parsing: true,
1966    },
1967    {
1968        name: enable_kafka_broker_matching_rules,
1969        desc: "MATCHING broker rules in BROKERS for Kafka PrivateLink connections",
1970        default: false,
1971        enable_for_item_parsing: true,
1972    },
1973    {
1974        name: enable_glue_schema_registry,
1975        desc: "CREATE CONNECTION ... TO AWS GLUE SCHEMA REGISTRY",
1976        default: false,
1977        enable_for_item_parsing: true,
1978    },
1979    {
1980        name: enable_alter_set_cluster,
1981        desc: "ALTER ... SET CLUSTER syntax",
1982        default: false,
1983        enable_for_item_parsing: true,
1984    },
1985    {
1986        name: unsafe_enable_unsafe_functions,
1987        desc: "executing potentially dangerous functions",
1988        default: false,
1989        enable_for_item_parsing: true,
1990    },
1991    {
1992        name: enable_managed_cluster_availability_zones,
1993        desc: "MANAGED, AVAILABILITY ZONES syntax",
1994        default: false,
1995        enable_for_item_parsing: true,
1996    },
1997    {
1998        name: statement_logging_use_reproducible_rng,
1999        desc: "statement logging with reproducible RNG",
2000        default: false,
2001        enable_for_item_parsing: false,
2002    },
2003    {
2004        name: enable_notices_for_index_already_exists,
2005        desc: "emitting notices for IndexAlreadyExists (doesn't affect EXPLAIN)",
2006        default: true,
2007        enable_for_item_parsing: true,
2008    },
2009    {
2010        name: enable_notices_for_index_too_wide_for_literal_constraints,
2011        desc: "emitting notices for IndexTooWideForLiteralConstraints (doesn't affect EXPLAIN)",
2012        default: false,
2013        enable_for_item_parsing: true,
2014    },
2015    {
2016        name: enable_notices_for_index_empty_key,
2017        desc: "emitting notices for indexes with an empty key (doesn't affect EXPLAIN)",
2018        default: true,
2019        enable_for_item_parsing: true,
2020    },
2021    {
2022        name: enable_notices_for_equals_null,
2023        desc: "emitting notices for `= NULL` and `<> NULL` comparisons (doesn't affect EXPLAIN)",
2024        default: true,
2025        enable_for_item_parsing: true,
2026    },
2027    {
2028        name: enable_alter_swap,
2029        desc: "the ALTER SWAP feature for objects",
2030        default: true,
2031        enable_for_item_parsing: true,
2032    },
2033    {
2034        name: enable_new_outer_join_lowering,
2035        desc: "new outer join lowering",
2036        default: true,
2037        enable_for_item_parsing: false,
2038        scope: ParameterScope::Cluster,
2039    },
2040    {
2041        name: enable_time_at_time_zone,
2042        desc: "use of AT TIME ZONE or timezone() with time type",
2043        default: false,
2044        enable_for_item_parsing: true,
2045    },
2046    {
2047        name: enable_load_generator_counter,
2048        desc: "Create a LOAD GENERATOR COUNTER",
2049        default: false,
2050        enable_for_item_parsing: true,
2051    },
2052    {
2053        name: enable_load_generator_clock,
2054        desc: "Create a LOAD GENERATOR CLOCK",
2055        default: false,
2056        enable_for_item_parsing: true,
2057    },
2058    {
2059        name: enable_load_generator_datums,
2060        desc: "Create a LOAD GENERATOR DATUMS",
2061        default: false,
2062        enable_for_item_parsing: true,
2063    },
2064    {
2065        name: enable_load_generator_key_value,
2066        desc: "Create a LOAD GENERATOR KEY VALUE",
2067        default: false,
2068        enable_for_item_parsing: true,
2069    },
2070    {
2071        name: enable_expressions_in_limit_syntax,
2072        desc: "LIMIT <expr> syntax",
2073        default: true,
2074        enable_for_item_parsing: true,
2075    },
2076    {
2077        name: enable_mz_notices,
2078        desc: "Populate the contents of `mz_internal.mz_notices`",
2079        default: true,
2080        enable_for_item_parsing: false,
2081    },
2082    {
2083        name: enable_eager_delta_joins,
2084        desc:
2085            "eager delta joins",
2086        default: false,
2087        enable_for_item_parsing: false,
2088        scope: ParameterScope::Cluster,
2089    },
2090    {
2091        name: enable_off_thread_optimization,
2092        desc: "use off-thread optimization in `CREATE` statements",
2093        default: true,
2094        enable_for_item_parsing: false,
2095    },
2096    {
2097        name: enable_refresh_every_mvs,
2098        desc: "REFRESH EVERY and REFRESH AT materialized views",
2099        default: false,
2100        enable_for_item_parsing: true,
2101    },
2102    {
2103        name: enable_cluster_schedule_refresh,
2104        desc: "`SCHEDULE = ON REFRESH` cluster option",
2105        default: false,
2106        enable_for_item_parsing: true,
2107    },
2108    {
2109        name: enable_reduce_mfp_fusion,
2110        desc: "fusion of MFPs in reductions",
2111        default: true,
2112        enable_for_item_parsing: false,
2113    },
2114    {
2115        name: enable_worker_core_affinity,
2116        desc: "set core affinity for replica worker threads",
2117        default: false,
2118        enable_for_item_parsing: false,
2119    },
2120    {
2121        name: enable_storage_introspection_logs,
2122        desc: "forward storage timely logging events into compute's introspection dataflow",
2123        default: false,
2124        enable_for_item_parsing: false,
2125    },
2126    {
2127        name: enable_session_timelines,
2128        desc: "strong session serializable isolation levels",
2129        default: false,
2130        enable_for_item_parsing: false,
2131    },
2132    {
2133        name: enable_variadic_left_join_lowering,
2134        desc: "Enable joint HIR ⇒ MIR lowering of stacks of left joins",
2135        default: true,
2136        enable_for_item_parsing: false,
2137        scope: ParameterScope::Cluster,
2138    },
2139    {
2140        name: enable_redacted_test_option,
2141        desc: "Enable useless option to test value redaction",
2142        default: false,
2143        enable_for_item_parsing: true,
2144    },
2145    {
2146        name: enable_letrec_fixpoint_analysis,
2147        desc: "Enable Lattice-based fixpoint iteration on LetRec nodes in the Analysis framework",
2148        default: true, // This is just a failsafe switch for the deployment of materialize#25591.
2149        enable_for_item_parsing: false,
2150        scope: ParameterScope::Cluster,
2151    },
2152    {
2153        name: enable_kafka_sink_headers,
2154        desc: "Enable the HEADERS option for Kafka sinks",
2155        default: false,
2156        enable_for_item_parsing: true,
2157    },
2158    {
2159        name: enable_unlimited_retain_history,
2160        desc: "Disable limits on RETAIN HISTORY (below 1s default, and 0 disables compaction).",
2161        default: false,
2162        enable_for_item_parsing: true,
2163    },
2164    {
2165        name: enable_envelope_upsert_inline_errors,
2166        desc: "The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT",
2167        default: true,
2168        enable_for_item_parsing: true,
2169    },
2170    {
2171        name: enable_alter_table_add_column,
2172        desc: "Enable ALTER TABLE ... ADD COLUMN ...",
2173        default: false,
2174        enable_for_item_parsing: false,
2175    },
2176    {
2177        name: enable_zero_downtime_cluster_reconfiguration,
2178        desc: "Enable zero-downtime reconfiguration for alter cluster",
2179        default: false,
2180        enable_for_item_parsing: false,
2181    },
2182    {
2183        name: enable_network_policies,
2184        desc: "ENABLE NETWORK POLICIES",
2185        default: true,
2186        enable_for_item_parsing: true,
2187    },
2188    {
2189        name: enable_create_table_from_source,
2190        desc: "Whether to allow CREATE TABLE .. FROM SOURCE syntax.",
2191        default: true,
2192        enable_for_item_parsing: true,
2193    },
2194    {
2195        name: enable_join_prioritize_arranged,
2196        desc: "Whether join planning should prioritize already-arranged keys over keys with more fields.",
2197        default: false,
2198        enable_for_item_parsing: false,
2199        scope: ParameterScope::Cluster,
2200    },
2201    {
2202        name: enable_projection_pushdown_after_relation_cse,
2203        desc: "Run ProjectionPushdown one more time after the last RelationCSE.",
2204        default: true,
2205        enable_for_item_parsing: false,
2206        scope: ParameterScope::Cluster,
2207    },
2208    {
2209        name: enable_less_reduce_in_eqprop,
2210        desc: "Run MSE::reduce in EquivalencePropagation only if reduce_expr changed something.",
2211        default: true,
2212        enable_for_item_parsing: false,
2213    },
2214    {
2215        name: enable_dequadratic_eqprop_map,
2216        desc: "Skip the quadratic part of EquivalencePropagation's handling of Map.",
2217        default: true,
2218        enable_for_item_parsing: false,
2219    },
2220    {
2221        name: enable_eq_classes_withholding_errors,
2222        desc: "Use `EquivalenceClassesWithholdingErrors` instead of raw `EquivalenceClasses` during eq prop for joins.",
2223        default: true,
2224        enable_for_item_parsing: false,
2225    },
2226    {
2227        name: enable_fast_path_plan_insights,
2228        desc: "Enables those plan insight notices that help with getting fast path queries. Don't turn on before #9492 is fixed!",
2229        default: false,
2230        enable_for_item_parsing: false,
2231    },
2232    {
2233        name: enable_with_ordinality_legacy_fallback,
2234        desc: "When the new WITH ORDINALITY implementation can't be used with a table func, whether to fall back to the legacy implementation or error out.",
2235        default: false,
2236        enable_for_item_parsing: true,
2237    },
2238    {
2239        name: enable_frontend_peek_sequencing, // currently, changes only take effect for new sessions
2240        desc: "Enables the new peek sequencing code, which does most of its work in the Adapter Frontend instead of the Coordinator main task.",
2241        default: true,
2242        enable_for_item_parsing: false,
2243    },
2244    {
2245        name: enable_replacement_materialized_views,
2246        desc: "Whether to enable replacement materialized views.",
2247        default: true,
2248        enable_for_item_parsing: true,
2249    },
2250    {
2251        name: enable_cast_elimination,
2252        desc: "Allow the optimizer to eliminate noop casts between values of equivalent representation types.",
2253        default: true,
2254        enable_for_item_parsing: false,
2255    },
2256    {
2257        // Just an escape hatch for the unlikely case that we have some user who is doing such
2258        // queries. Can be removed after one week in prod.
2259        // https://github.com/MaterializeInc/database-issues/issues/10004
2260        name: disallow_unmaterializable_functions_as_of,
2261        desc: "Prohibits calling unmaterializable functions (except `mz_now`) in AS OF queries.",
2262        default: true,
2263        enable_for_item_parsing: false,
2264    },
2265    {
2266        name: enable_case_literal_transform,
2267        desc: "Allow the optimizer to rewrite If-chains matching a single expression against literals into a CaseLiteral lookup.",
2268        default: false,
2269        enable_for_item_parsing: false,
2270    },
2271    {
2272        name: enable_simplify_quantified_comparisons,
2273        desc: "Allow the optimizer to simplify quantified comparisons in JOIN ON clauses into semi/anti-join EXISTS form during HIR-to-MIR lowering.",
2274        default: true,
2275        enable_for_item_parsing: false,
2276    },
2277    {
2278        name: enable_coalesce_case_transform,
2279        desc: "Allow the optimizer to push `COALESCE` into `CASE WHEN`.",
2280        default: true,
2281        enable_for_item_parsing: false,
2282    },
2283    // Disposition: added 2026-05-29, default on; remove after several weeks of observation.
2284    {
2285        name: enable_will_distinct_propagation,
2286        desc: "Allow the WillDistinct transform to propagate a pending distinct through Map, Filter, FlatMap, Threshold, Negate, non-negative Project, and TopK with limit 1 and offset 0.",
2287        default: true,
2288        enable_for_item_parsing: false,
2289    },
2290    {
2291        name: enable_bounded_staleness_isolation,
2292        desc: "the `bounded staleness <duration>` transaction isolation level",
2293        default: true,
2294        enable_for_item_parsing: false,
2295    },
2296);
2297
2298impl From<&super::SystemVars> for OptimizerFeatures {
2299    fn from(vars: &super::SystemVars) -> Self {
2300        Self {
2301            enable_eager_delta_joins: vars.enable_eager_delta_joins(),
2302            enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(),
2303            enable_reduce_mfp_fusion: vars.enable_reduce_mfp_fusion(),
2304            enable_variadic_left_join_lowering: vars.enable_variadic_left_join_lowering(),
2305            enable_letrec_fixpoint_analysis: vars.enable_letrec_fixpoint_analysis(),
2306            enable_cardinality_estimates: vars.enable_cardinality_estimates(),
2307            persist_fast_path_limit: vars.persist_fast_path_limit(),
2308            reoptimize_imported_views: false,
2309            enable_join_prioritize_arranged: vars.enable_join_prioritize_arranged(),
2310            enable_projection_pushdown_after_relation_cse: vars
2311                .enable_projection_pushdown_after_relation_cse(),
2312            enable_less_reduce_in_eqprop: vars.enable_less_reduce_in_eqprop(),
2313            enable_dequadratic_eqprop_map: vars.enable_dequadratic_eqprop_map(),
2314            enable_eq_classes_withholding_errors: vars.enable_eq_classes_withholding_errors(),
2315            enable_fast_path_plan_insights: vars.enable_fast_path_plan_insights(),
2316            enable_cast_elimination: vars.enable_cast_elimination(),
2317            enable_case_literal_transform: vars.enable_case_literal_transform(),
2318            enable_simplify_quantified_comparisons: vars.enable_simplify_quantified_comparisons(),
2319            enable_coalesce_case_transform: vars.enable_coalesce_case_transform(),
2320            enable_will_distinct_propagation: vars.enable_will_distinct_propagation(),
2321        }
2322    }
2323}
2324
2325#[cfg(test)]
2326mod tests {
2327    use super::*;
2328    use crate::session::vars::SystemVars;
2329
2330    /// Ensure that all vars used for optimizer features have `enable_for_item_parsing = false`.
2331    ///
2332    /// This is important to ensure that plan caching works as intended during item parsing. Cached
2333    /// plans include the optimizer features they were produced with, and if they don't match on
2334    /// lookup, that results in a cache miss.
2335    #[mz_ore::test]
2336    fn optimizer_features_no_enable_for_item_parsing() {
2337        // Construct a `SystemVars` where all optimizer features are `false`.
2338        //
2339        // We do this in a roundabout way, by first constructing all-false `OptimizerFeatures` and
2340        // then assigning them to their respective system vars, to ensure we don't forget to update
2341        // this test when new optimizer features are added.
2342        let false_features = OptimizerFeatures::default();
2343        let OptimizerFeatures {
2344            enable_eq_classes_withholding_errors,
2345            enable_eager_delta_joins,
2346            enable_letrec_fixpoint_analysis,
2347            enable_new_outer_join_lowering,
2348            enable_reduce_mfp_fusion,
2349            enable_variadic_left_join_lowering,
2350            enable_cardinality_estimates,
2351            persist_fast_path_limit,
2352            reoptimize_imported_views,
2353            enable_join_prioritize_arranged,
2354            enable_projection_pushdown_after_relation_cse,
2355            enable_less_reduce_in_eqprop,
2356            enable_dequadratic_eqprop_map,
2357            enable_fast_path_plan_insights,
2358            enable_cast_elimination,
2359            enable_case_literal_transform,
2360            enable_simplify_quantified_comparisons,
2361            enable_coalesce_case_transform,
2362            enable_will_distinct_propagation,
2363        } = false_features;
2364
2365        let mut vars = SystemVars::new();
2366
2367        macro_rules! set_var {
2368            ($var:ident) => {
2369                vars.set(stringify!($var), VarInput::Flat(&$var.to_string()))
2370                    .unwrap();
2371            };
2372        }
2373
2374        set_var!(enable_eq_classes_withholding_errors);
2375        set_var!(enable_eager_delta_joins);
2376        set_var!(enable_letrec_fixpoint_analysis);
2377        set_var!(enable_new_outer_join_lowering);
2378        set_var!(enable_reduce_mfp_fusion);
2379        set_var!(enable_variadic_left_join_lowering);
2380        set_var!(enable_cardinality_estimates);
2381        set_var!(persist_fast_path_limit);
2382        let _ = reoptimize_imported_views; // no corresponding var
2383        set_var!(enable_join_prioritize_arranged);
2384        set_var!(enable_projection_pushdown_after_relation_cse);
2385        set_var!(enable_less_reduce_in_eqprop);
2386        set_var!(enable_dequadratic_eqprop_map);
2387        set_var!(enable_fast_path_plan_insights);
2388        set_var!(enable_cast_elimination);
2389        set_var!(enable_case_literal_transform);
2390        set_var!(enable_simplify_quantified_comparisons);
2391        set_var!(enable_coalesce_case_transform);
2392        set_var!(enable_will_distinct_propagation);
2393
2394        // Enable for item parsing, then ensure we still get the same optimizer features.
2395        vars.enable_for_item_parsing();
2396        let features_for_item_parsing = OptimizerFeatures::from(&vars);
2397        assert_eq!(features_for_item_parsing, false_features);
2398    }
2399}