Skip to main content

mz_sql/session/vars/
definitions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::num::NonZeroU32;
12use std::str::FromStr;
13use std::sync::Arc;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::{DateTime, Utc};
18use derivative::Derivative;
19use mz_adapter_types::timestamp_oracle::{
20    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
21    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
22};
23use mz_ore::cast::{self, CastFrom};
24use mz_repr::adt::numeric::Numeric;
25use mz_repr::adt::timestamp::CheckedTimestamp;
26use mz_repr::bytes::ByteSize;
27use mz_repr::optimize::OptimizerFeatures;
28use mz_sql_parser::ast::Ident;
29use mz_sql_parser::ident;
30use mz_storage_types::parameters::REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT;
31use mz_storage_types::parameters::{
32    DEFAULT_PG_SOURCE_CONNECT_TIMEOUT, DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
33    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE, DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
34    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES, DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT,
35    DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT, STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
36};
37use mz_tracing::{CloneableEnvFilter, SerializableDirective};
38use uncased::UncasedStr;
39
40use crate::session::user::{SUPPORT_USER, SYSTEM_USER, User};
41use crate::session::vars::constraints::{
42    BYTESIZE_AT_LEAST_1MB, DomainConstraint, NUMERIC_BOUNDED_0_1_INCLUSIVE, NUMERIC_NON_NEGATIVE,
43    ValueConstraint,
44};
45use crate::session::vars::errors::VarError;
46use crate::session::vars::polyfill::{LazyValueFn, lazy_value, value};
47use crate::session::vars::value::{
48    ClientEncoding, ClientSeverity, DEFAULT_DATE_STYLE, Failpoints, IntervalStyle, IsolationLevel,
49    TimeZone, Value,
50};
51use crate::session::vars::{FeatureFlag, Var, VarInput, VarParseError};
52use crate::{DEFAULT_SCHEMA, WEBHOOK_CONCURRENCY_LIMIT};
53
54/// Definition of a variable.
55#[derive(Clone, Derivative)]
56#[derivative(Debug)]
57pub struct VarDefinition {
58    /// Name of the variable, case-insensitive matching.
59    pub name: &'static UncasedStr,
60    /// Description of the variable.
61    pub description: &'static str,
62    /// Is the variable visible to users, when false only visible to system users.
63    pub user_visible: bool,
64
65    /// Default compiled in value for this variable.
66    pub value: VarDefaultValue,
67    /// Constraint that must be upheld for this variable to be valid.
68    pub constraint: Option<ValueConstraint>,
69    /// When set, prevents getting or setting the variable unless the specified
70    /// feature flag is enabled.
71    pub require_feature_flag: Option<&'static FeatureFlag>,
72
73    /// Method to parse [`VarInput`] into a type that implements [`Value`].
74    ///
75    /// The reason `parse` exists as a function pointer is because we want to achieve two things:
76    ///   1. `VarDefinition` has no generic parameters.
77    ///   2. `Value::parse` returns an instance of `Self`.
78    /// `VarDefinition` holds a `dyn Value`, but `Value::parse` is not object safe because it
79    /// returns `Self`, so we can't call that method. We could change `Value::parse` to return a
80    /// `Box<dyn Value>` making it object safe, but that creates a footgun where it's possible for
81    /// `Value::parse` to return a type that isn't `Self`, e.g. `<String as Value>::parse` could
82    /// return a `usize`!
83    ///
84    /// So to prevent making `VarDefinition` generic over some type `V: Value`, but also defining
85    /// `Value::parse` as returning `Self`, we store a static function pointer to the `parse`
86    /// implementation of our default value.
87    #[derivative(Debug = "ignore")]
88    parse: fn(VarInput) -> Result<Box<dyn Value>, VarParseError>,
89    /// Returns a human readable name for the type of this variable. We store this as a static
90    /// function pointer for the same reason as `parse`.
91    #[derivative(Debug = "ignore")]
92    type_name: fn() -> Cow<'static, str>,
93}
94static_assertions::assert_impl_all!(VarDefinition: Send, Sync);
95
96impl VarDefinition {
97    /// Create a new [`VarDefinition`] in a const context with a value known at compile time.
98    pub const fn new<V: Value>(
99        name: &'static str,
100        value: &'static V,
101        description: &'static str,
102        user_visible: bool,
103    ) -> Self {
104        VarDefinition {
105            name: UncasedStr::new(name),
106            description,
107            value: VarDefaultValue::Static(value),
108            user_visible,
109            parse: V::parse_dyn_value,
110            type_name: V::type_name,
111            constraint: None,
112            require_feature_flag: None,
113        }
114    }
115
116    /// Create a new [`VarDefinition`] in a const context with a lazily evaluated value.
117    pub const fn new_lazy<V: Value, L: LazyValueFn<V>>(
118        name: &'static str,
119        _value: L,
120        description: &'static str,
121        user_visible: bool,
122    ) -> Self {
123        VarDefinition {
124            name: UncasedStr::new(name),
125            description,
126            value: VarDefaultValue::Lazy(L::LAZY_VALUE_FN),
127            user_visible,
128            parse: V::parse_dyn_value,
129            type_name: V::type_name,
130            constraint: None,
131            require_feature_flag: None,
132        }
133    }
134
135    /// Create a new [`VarDefinition`] with a value known at runtime.
136    pub fn new_runtime<V: Value>(
137        name: &'static str,
138        value: V,
139        description: &'static str,
140        user_visible: bool,
141    ) -> Self {
142        VarDefinition {
143            name: UncasedStr::new(name),
144            description,
145            value: VarDefaultValue::Runtime(Arc::new(value)),
146            user_visible,
147            parse: V::parse_dyn_value,
148            type_name: V::type_name,
149            constraint: None,
150            require_feature_flag: None,
151        }
152    }
153
154    /// TODO(parkmycar): Refactor this method onto a `VarDefinitionBuilder` that would allow us to
155    /// constrain `V` here to be the same `V` used in [`VarDefinition::new`].
156    pub const fn with_constraint<V: Value, D: DomainConstraint<Value = V>>(
157        mut self,
158        constraint: &'static D,
159    ) -> Self {
160        self.constraint = Some(ValueConstraint::Domain(constraint));
161        self
162    }
163
164    pub const fn fixed(mut self) -> Self {
165        self.constraint = Some(ValueConstraint::Fixed);
166        self
167    }
168
169    pub const fn read_only(mut self) -> Self {
170        self.constraint = Some(ValueConstraint::ReadOnly);
171        self
172    }
173
174    pub const fn with_feature_flag(mut self, feature_flag: &'static FeatureFlag) -> Self {
175        self.require_feature_flag = Some(feature_flag);
176        self
177    }
178
179    pub fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
180        (self.parse)(input).map_err(|err| err.into_var_error(self))
181    }
182
183    pub fn default_value(&self) -> &'_ dyn Value {
184        self.value.value()
185    }
186}
187
188impl Var for VarDefinition {
189    fn name(&self) -> &'static str {
190        self.name.as_str()
191    }
192
193    fn value(&self) -> String {
194        self.default_value().format()
195    }
196
197    fn description(&self) -> &'static str {
198        self.description
199    }
200
201    fn type_name(&self) -> Cow<'static, str> {
202        (self.type_name)()
203    }
204
205    fn visible(&self, user: &User, system_vars: &super::SystemVars) -> Result<(), VarError> {
206        if !self.user_visible && user != &*SYSTEM_USER && user != &*SUPPORT_USER {
207            Err(VarError::UnknownParameter(self.name().to_string()))
208        } else if self.is_unsafe() && !system_vars.allow_unsafe() {
209            Err(VarError::RequiresUnsafeMode(self.name()))
210        } else {
211            if let Some(flag) = self.require_feature_flag {
212                flag.require(system_vars)?;
213            }
214
215            Ok(())
216        }
217    }
218}
219
220/// The kinds of compiled in default values that can be used with [`VarDefinition`].
221#[derive(Clone, Debug)]
222pub enum VarDefaultValue {
223    /// Static that can be evaluated at compile time.
224    Static(&'static dyn Value),
225    /// Lazy value that is defined at compile time, but created at runtime.
226    Lazy(fn() -> &'static dyn Value),
227    /// Value created at runtime. Note: This is generally an escape hatch.
228    Runtime(Arc<dyn Value>),
229}
230
231impl VarDefaultValue {
232    pub fn value(&self) -> &'_ dyn Value {
233        match self {
234            VarDefaultValue::Static(s) => *s,
235            VarDefaultValue::Lazy(l) => (l)(),
236            VarDefaultValue::Runtime(r) => r.as_ref(),
237        }
238    }
239}
240
241// We pretend to be Postgres v9.5.0, which is also what CockroachDB pretends to
242// be. Too new and some clients will emit a "server too new" warning. Too old
243// and some clients will fall back to legacy code paths. v9.5.0 empirically
244// seems to be a good compromise.
245
246/// The major version of PostgreSQL that Materialize claims to be.
247pub const SERVER_MAJOR_VERSION: u8 = 9;
248
249/// The minor version of PostgreSQL that Materialize claims to be.
250pub const SERVER_MINOR_VERSION: u8 = 5;
251
252/// The patch version of PostgreSQL that Materialize claims to be.
253pub const SERVER_PATCH_VERSION: u8 = 0;
254
255/// The name of the default database that Materialize uses.
256pub const DEFAULT_DATABASE_NAME: &str = "materialize";
257
258pub static APPLICATION_NAME: VarDefinition = VarDefinition::new(
259    "application_name",
260    value!(String; String::new()),
261    "Sets the application name to be reported in statistics and logs (PostgreSQL).",
262    true,
263);
264
265pub static CLIENT_ENCODING: VarDefinition = VarDefinition::new(
266    "client_encoding",
267    value!(ClientEncoding; ClientEncoding::Utf8),
268    "Sets the client's character set encoding (PostgreSQL).",
269    true,
270);
271
272pub static CLIENT_MIN_MESSAGES: VarDefinition = VarDefinition::new(
273    "client_min_messages",
274    value!(ClientSeverity; ClientSeverity::Notice),
275    "Sets the message levels that are sent to the client (PostgreSQL).",
276    true,
277);
278
279pub static CLUSTER: VarDefinition = VarDefinition::new_lazy(
280    "cluster",
281    lazy_value!(String; || "quickstart".to_string()),
282    "Sets the current cluster (Materialize).",
283    true,
284);
285
286pub static CLUSTER_REPLICA: VarDefinition = VarDefinition::new(
287    "cluster_replica",
288    value!(Option<String>; None),
289    "Sets a target cluster replica for SELECT queries (Materialize).",
290    true,
291);
292
293pub static CURRENT_OBJECT_MISSING_WARNINGS: VarDefinition = VarDefinition::new(
294    "current_object_missing_warnings",
295    value!(bool; true),
296    "Whether to emit warnings when the current database, schema, or cluster is missing (Materialize).",
297    true,
298);
299
300pub static DATABASE: VarDefinition = VarDefinition::new_lazy(
301    "database",
302    lazy_value!(String; || DEFAULT_DATABASE_NAME.to_string()),
303    "Sets the current database (CockroachDB).",
304    true,
305);
306
307pub static DATE_STYLE: VarDefinition = VarDefinition::new(
308    // DateStyle has nonstandard capitalization for historical reasons.
309    "DateStyle",
310    &DEFAULT_DATE_STYLE,
311    "Sets the display format for date and time values (PostgreSQL).",
312    true,
313);
314
315pub static DEFAULT_CLUSTER_REPLICATION_FACTOR: VarDefinition = VarDefinition::new(
316    "default_cluster_replication_factor",
317    value!(u32; 1),
318    "Default cluster replication factor (Materialize).",
319    true,
320);
321
322pub static EXTRA_FLOAT_DIGITS: VarDefinition = VarDefinition::new(
323    "extra_float_digits",
324    value!(i32; 3),
325    "Adjusts the number of digits displayed for floating-point values (PostgreSQL).",
326    true,
327);
328
329pub static FAILPOINTS: VarDefinition = VarDefinition::new(
330    "failpoints",
331    value!(Failpoints; Failpoints),
332    "Allows failpoints to be dynamically activated.",
333    true,
334);
335
336pub static INTEGER_DATETIMES: VarDefinition = VarDefinition::new(
337    "integer_datetimes",
338    value!(bool; true),
339    "Reports whether the server uses 64-bit-integer dates and times (PostgreSQL).",
340    true,
341)
342.fixed();
343
344pub static INTERVAL_STYLE: VarDefinition = VarDefinition::new(
345    // IntervalStyle has nonstandard capitalization for historical reasons.
346    "IntervalStyle",
347    value!(IntervalStyle; IntervalStyle::Postgres),
348    "Sets the display format for interval values (PostgreSQL).",
349    true,
350);
351
352pub const MZ_VERSION_NAME: &UncasedStr = UncasedStr::new("mz_version");
353pub const IS_SUPERUSER_NAME: &UncasedStr = UncasedStr::new("is_superuser");
354
355// Schema can be used an alias for a search path with a single element.
356pub const SCHEMA_ALIAS: &UncasedStr = UncasedStr::new("schema");
357pub static SEARCH_PATH: VarDefinition = VarDefinition::new_lazy(
358    "search_path",
359    lazy_value!(Vec<Ident>; || vec![ident!(DEFAULT_SCHEMA)]),
360    "Sets the schema search order for names that are not schema-qualified (PostgreSQL).",
361    true,
362);
363
364pub static STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
365    "statement_timeout",
366    value!(Duration; Duration::from_secs(60)),
367    "Sets the maximum allowed duration of INSERT...SELECT, UPDATE, and DELETE operations. \
368    If this value is specified without units, it is taken as milliseconds.",
369    true,
370);
371
372pub static IDLE_IN_TRANSACTION_SESSION_TIMEOUT: VarDefinition = VarDefinition::new(
373    "idle_in_transaction_session_timeout",
374    value!(Duration; Duration::from_secs(60 * 2)),
375    "Sets the maximum allowed duration that a session can sit idle in a transaction before \
376    being terminated. If this value is specified without units, it is taken as milliseconds. \
377    A value of zero disables the timeout (PostgreSQL).",
378    true,
379);
380
381pub static SERVER_VERSION: VarDefinition = VarDefinition::new_lazy(
382    "server_version",
383    lazy_value!(String; || {
384        format!("{SERVER_MAJOR_VERSION}.{SERVER_MINOR_VERSION}.{SERVER_PATCH_VERSION}")
385    }),
386    "Shows the PostgreSQL compatible server version (PostgreSQL).",
387    true,
388)
389.read_only();
390
391pub static SERVER_VERSION_NUM: VarDefinition = VarDefinition::new(
392    "server_version_num",
393    value!(i32; (cast::u8_to_i32(SERVER_MAJOR_VERSION) * 10_000)
394        + (cast::u8_to_i32(SERVER_MINOR_VERSION) * 100)
395        + cast::u8_to_i32(SERVER_PATCH_VERSION)),
396    "Shows the PostgreSQL compatible server version as an integer (PostgreSQL).",
397    true,
398)
399.read_only();
400
401pub static SQL_SAFE_UPDATES: VarDefinition = VarDefinition::new(
402    "sql_safe_updates",
403    value!(bool; false),
404    "Prohibits SQL statements that may be overly destructive (CockroachDB).",
405    true,
406);
407
408pub static STANDARD_CONFORMING_STRINGS: VarDefinition = VarDefinition::new(
409    "standard_conforming_strings",
410    value!(bool; true),
411    "Causes '...' strings to treat backslashes literally (PostgreSQL).",
412    true,
413)
414.fixed();
415
416pub static TIMEZONE: VarDefinition = VarDefinition::new(
417    // TimeZone has nonstandard capitalization for historical reasons.
418    "TimeZone",
419    value!(TimeZone; TimeZone::UTC),
420    "Sets the time zone for displaying and interpreting time stamps (PostgreSQL).",
421    true,
422);
423
424pub const TRANSACTION_ISOLATION_VAR_NAME: &str = "transaction_isolation";
425pub static TRANSACTION_ISOLATION: VarDefinition = VarDefinition::new(
426    TRANSACTION_ISOLATION_VAR_NAME,
427    value!(IsolationLevel; IsolationLevel::StrictSerializable),
428    "Sets the current transaction's isolation level (PostgreSQL).",
429    true,
430);
431
432pub static MAX_KAFKA_CONNECTIONS: VarDefinition = VarDefinition::new(
433    "max_kafka_connections",
434    value!(u32; 1000),
435    "The maximum number of Kafka connections in the region, across all schemas (Materialize).",
436    true,
437);
438
439pub static MAX_POSTGRES_CONNECTIONS: VarDefinition = VarDefinition::new(
440    "max_postgres_connections",
441    value!(u32; 1000),
442    "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize).",
443    true,
444);
445
446pub static MAX_MYSQL_CONNECTIONS: VarDefinition = VarDefinition::new(
447    "max_mysql_connections",
448    value!(u32; 1000),
449    "The maximum number of MySQL connections in the region, across all schemas (Materialize).",
450    true,
451);
452
453pub static MAX_SQL_SERVER_CONNECTIONS: VarDefinition = VarDefinition::new(
454    "max_sql_server_connections",
455    value!(u32; 1000),
456    "The maximum number of SQL Server connections in the region, across all schemas (Materialize).",
457    true,
458);
459
460pub static MAX_AWS_PRIVATELINK_CONNECTIONS: VarDefinition = VarDefinition::new(
461    "max_aws_privatelink_connections",
462    value!(u32; 0),
463    "The maximum number of AWS PrivateLink connections in the region, across all schemas (Materialize).",
464    true,
465);
466
467pub static MAX_TABLES: VarDefinition = VarDefinition::new(
468    "max_tables",
469    value!(u32; 200),
470    "The maximum number of tables in the region, across all schemas (Materialize).",
471    true,
472);
473
474pub static MAX_SOURCES: VarDefinition = VarDefinition::new(
475    "max_sources",
476    value!(u32; 200),
477    "The maximum number of sources in the region, across all schemas (Materialize).",
478    true,
479);
480
481pub static MAX_SINKS: VarDefinition = VarDefinition::new(
482    "max_sinks",
483    value!(u32; 1000),
484    "The maximum number of sinks in the region, across all schemas (Materialize).",
485    true,
486);
487
488pub static MAX_MATERIALIZED_VIEWS: VarDefinition = VarDefinition::new(
489    "max_materialized_views",
490    value!(u32; 500),
491    "The maximum number of materialized views in the region, across all schemas (Materialize).",
492    true,
493);
494
495pub static MAX_CLUSTERS: VarDefinition = VarDefinition::new(
496    "max_clusters",
497    value!(u32; 25),
498    "The maximum number of clusters in the region (Materialize).",
499    true,
500);
501
502pub static MAX_REPLICAS_PER_CLUSTER: VarDefinition = VarDefinition::new(
503    "max_replicas_per_cluster",
504    value!(u32; 5),
505    "The maximum number of replicas of a single cluster (Materialize).",
506    true,
507);
508
509pub static MAX_CREDIT_CONSUMPTION_RATE: VarDefinition = VarDefinition::new_lazy(
510    "max_credit_consumption_rate",
511    lazy_value!(Numeric; || 1024.into()),
512    "The maximum rate of credit consumption in a region. Credits are consumed based on the size of cluster replicas in use (Materialize).",
513    true,
514)
515.with_constraint(&NUMERIC_NON_NEGATIVE);
516
517pub static MAX_DATABASES: VarDefinition = VarDefinition::new(
518    "max_databases",
519    value!(u32; 1000),
520    "The maximum number of databases in the region (Materialize).",
521    true,
522);
523
524pub static MAX_SCHEMAS_PER_DATABASE: VarDefinition = VarDefinition::new(
525    "max_schemas_per_database",
526    value!(u32; 1000),
527    "The maximum number of schemas in a database (Materialize).",
528    true,
529);
530
531pub static MAX_OBJECTS_PER_SCHEMA: VarDefinition = VarDefinition::new(
532    "max_objects_per_schema",
533    value!(u32; 1000),
534    "The maximum number of objects in a schema (Materialize).",
535    true,
536);
537
538pub static MAX_SECRETS: VarDefinition = VarDefinition::new(
539    "max_secrets",
540    value!(u32; 100),
541    "The maximum number of secrets in the region, across all schemas (Materialize).",
542    true,
543);
544
545pub static MAX_ROLES: VarDefinition = VarDefinition::new(
546    "max_roles",
547    value!(u32; 1000),
548    "The maximum number of roles in the region (Materialize).",
549    true,
550);
551
552pub static MAX_NETWORK_POLICIES: VarDefinition = VarDefinition::new(
553    "max_network_policies",
554    value!(u32; 25),
555    "The maximum number of network policies in the region.",
556    true,
557);
558
559pub static MAX_RULES_PER_NETWORK_POLICY: VarDefinition = VarDefinition::new(
560    "max_rules_per_network_policy",
561    value!(u32; 25),
562    "The maximum number of rules per network policies.",
563    true,
564);
565
566// Cloud environmentd is configured with 4 GiB of RAM, so 1 GiB is a good heuristic for a single
567// query.
568//
569// We constrain this parameter to a minimum of 1MB, to avoid accidental usage of values that will
570// interfere with queries executed by the system itself.
571//
572// TODO(jkosh44) Eventually we want to be able to return arbitrary sized results.
573pub static MAX_RESULT_SIZE: VarDefinition = VarDefinition::new(
574    "max_result_size",
575    value!(ByteSize; ByteSize::gb(1)),
576    "The maximum size in bytes for an internal query result (Materialize).",
577    true,
578)
579.with_constraint(&BYTESIZE_AT_LEAST_1MB);
580
581pub static MAX_QUERY_RESULT_SIZE: VarDefinition = VarDefinition::new(
582    "max_query_result_size",
583    value!(ByteSize; ByteSize::gb(1)),
584    "The maximum size in bytes for a single query's result (Materialize).",
585    true,
586);
587
588pub static MAX_COPY_FROM_ROW_SIZE: VarDefinition = VarDefinition::new(
589    "max_copy_from_row_size",
590    value!(ByteSize; ByteSize::mb(128)),
591    "The maximum size in bytes for a single COPY FROM STDIN row (Materialize).",
592    true,
593);
594
595pub static MAX_IDENTIFIER_LENGTH: VarDefinition = VarDefinition::new(
596    "max_identifier_length",
597    value!(usize; mz_sql_lexer::lexer::MAX_IDENTIFIER_LENGTH),
598    "The maximum length of object identifiers in bytes (PostgreSQL).",
599    true,
600);
601
602pub static WELCOME_MESSAGE: VarDefinition = VarDefinition::new(
603    "welcome_message",
604    value!(bool; true),
605    "Whether to send a notice with a welcome message after a successful connection (Materialize).",
606    true,
607);
608
609/// The logical compaction window for builtin tables and sources that have the
610/// `retained_metrics_relation` flag set.
611///
612/// The existence of this variable is a bit of a hack until we have a fully
613/// general solution for controlling retention windows.
614pub static METRICS_RETENTION: VarDefinition = VarDefinition::new(
615    "metrics_retention",
616    // 30 days
617    value!(Duration; Duration::from_secs(30 * 24 * 60 * 60)),
618    "The time to retain cluster utilization metrics (Materialize).",
619    false,
620);
621
622pub static ALLOWED_CLUSTER_REPLICA_SIZES: VarDefinition = VarDefinition::new(
623    "allowed_cluster_replica_sizes",
624    value!(Vec<Ident>; Vec::new()),
625    "The allowed sizes when creating a new cluster replica (Materialize).",
626    true,
627);
628
629pub static PERSIST_FAST_PATH_LIMIT: VarDefinition = VarDefinition::new(
630    "persist_fast_path_limit",
631    value!(usize; 25),
632    "An exclusive upper bound on the number of results we may return from a Persist fast-path peek; \
633    queries that may return more results will follow the normal / slow path. \
634    Setting this to 0 disables the feature.",
635    false,
636);
637
638/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_size`.
639pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE: VarDefinition = VarDefinition::new(
640    "pg_timestamp_oracle_connection_pool_max_size",
641    value!(usize; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE),
642    "Maximum size of the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
643    false,
644);
645
646/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_wait`.
647pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT: VarDefinition = VarDefinition::new(
648    "pg_timestamp_oracle_connection_pool_max_wait",
649    value!(Option<Duration>; Some(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT)),
650    "The maximum time to wait when attempting to obtain a connection from the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
651    false,
652);
653
654/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl`.
655pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL: VarDefinition = VarDefinition::new(
656    "pg_timestamp_oracle_connection_pool_ttl",
657    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
658    "The minimum TTL of a Consensus connection to Postgres/CRDB before it is proactively terminated",
659    false,
660);
661
662/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl_stagger`.
663pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER: VarDefinition = VarDefinition::new(
664    "pg_timestamp_oracle_connection_pool_ttl_stagger",
665    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER),
666    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
667    false,
668);
669
670pub static UNSAFE_NEW_TRANSACTION_WALL_TIME: VarDefinition = VarDefinition::new(
671    "unsafe_new_transaction_wall_time",
672    value!(Option<CheckedTimestamp<DateTime<Utc>>>; None),
673    "Sets the wall time for all new explicit or implicit transactions to control the value of `now()`. \
674    If not set, uses the system's clock.",
675    // This needs to be true because `user_visible: false` things are only modifiable by the mz_system
676    // and mz_support users, and we want sqllogictest to have access with its user. Because the name
677    // starts with "unsafe" it still won't be visible or changeable by users unless unsafe mode is
678    // enabled.
679    true,
680);
681
682pub static SCRAM_ITERATIONS: VarDefinition = VarDefinition::new(
683    "scram_iterations",
684    // / The default iteration count as suggested by
685    // / <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
686    value!(NonZeroU32; NonZeroU32::new(600_000).unwrap()),
687    "Iterations to use when hashing passwords. Higher iterations are more secure, but take longer to validated. \
688    Please consider the security risks before reducing this below the default value.",
689    true,
690);
691
692/// Tuning for RocksDB used by `UPSERT` sources that takes effect on restart.
693pub mod upsert_rocksdb {
694    use super::*;
695    use mz_rocksdb_types::config::{CompactionStyle, CompressionType};
696
697    pub static UPSERT_ROCKSDB_COMPACTION_STYLE: VarDefinition = VarDefinition::new(
698        "upsert_rocksdb_compaction_style",
699        value!(CompactionStyle; mz_rocksdb_types::defaults::DEFAULT_COMPACTION_STYLE),
700        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
701        sources. Described in the `mz_rocksdb_types::config` module. \
702        Only takes effect on source restart (Materialize).",
703        false,
704    );
705
706    pub static UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET: VarDefinition =
707        VarDefinition::new(
708            "upsert_rocksdb_optimize_compaction_memtable_budget",
709            value!(usize; mz_rocksdb_types::defaults::DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET),
710            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
711        sources. Described in the `mz_rocksdb_types::config` module. \
712        Only takes effect on source restart (Materialize).",
713            false,
714        );
715
716    pub static UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES: VarDefinition =
717        VarDefinition::new(
718            "upsert_rocksdb_level_compaction_dynamic_level_bytes",
719            value!(bool; mz_rocksdb_types::defaults::DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES),
720            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
721        sources. Described in the `mz_rocksdb_types::config` module. \
722        Only takes effect on source restart (Materialize).",
723            false,
724        );
725
726    pub static UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO: VarDefinition = VarDefinition::new(
727        "upsert_rocksdb_universal_compaction_ratio",
728        value!(i32; mz_rocksdb_types::defaults::DEFAULT_UNIVERSAL_COMPACTION_RATIO),
729        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
730        sources. Described in the `mz_rocksdb_types::config` module. \
731        Only takes effect on source restart (Materialize).",
732        false,
733    );
734
735    pub static UPSERT_ROCKSDB_PARALLELISM: VarDefinition = VarDefinition::new(
736        "upsert_rocksdb_parallelism",
737        value!(Option<i32>; mz_rocksdb_types::defaults::DEFAULT_PARALLELISM),
738        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
739        sources. Described in the `mz_rocksdb_types::config` module. \
740        Only takes effect on source restart (Materialize).",
741        false,
742    );
743
744    pub static UPSERT_ROCKSDB_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
745        "upsert_rocksdb_compression_type",
746        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_COMPRESSION_TYPE),
747        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
748        sources. Described in the `mz_rocksdb_types::config` module. \
749        Only takes effect on source restart (Materialize).",
750        false,
751    );
752
753    pub static UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
754        "upsert_rocksdb_bottommost_compression_type",
755        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_BOTTOMMOST_COMPRESSION_TYPE),
756        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
757        sources. Described in the `mz_rocksdb_types::config` module. \
758        Only takes effect on source restart (Materialize).",
759        false,
760    );
761
762    pub static UPSERT_ROCKSDB_BATCH_SIZE: VarDefinition = VarDefinition::new(
763        "upsert_rocksdb_batch_size",
764        value!(usize; mz_rocksdb_types::defaults::DEFAULT_BATCH_SIZE),
765        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
766        sources. Described in the `mz_rocksdb_types::config` module. \
767        Can be changed dynamically (Materialize).",
768        false,
769    );
770
771    pub static UPSERT_ROCKSDB_RETRY_DURATION: VarDefinition = VarDefinition::new(
772        "upsert_rocksdb_retry_duration",
773        value!(Duration; mz_rocksdb_types::defaults::DEFAULT_RETRY_DURATION),
774        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
775        sources. Described in the `mz_rocksdb_types::config` module. \
776        Only takes effect on source restart (Materialize).",
777        false,
778    );
779
780    pub static UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
781        "upsert_rocksdb_stats_log_interval_seconds",
782        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_LOG_INTERVAL_S),
783        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
784        sources. Described in the `mz_rocksdb_types::config` module. \
785        Only takes effect on source restart (Materialize).",
786        false,
787    );
788
789    pub static UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
790        "upsert_rocksdb_stats_persist_interval_seconds",
791        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_PERSIST_INTERVAL_S),
792        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
793        sources. Described in the `mz_rocksdb_types::config` module. \
794        Only takes effect on source restart (Materialize).",
795        false,
796    );
797
798    pub static UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB: VarDefinition = VarDefinition::new(
799        "upsert_rocksdb_point_lookup_block_cache_size_mb",
800        value!(Option<u32>; None),
801        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
802        sources. Described in the `mz_rocksdb_types::config` module. \
803        Only takes effect on source restart (Materialize).",
804        false,
805    );
806
807    /// The number of times by which allocated buffers will be shrinked in upsert rocksdb.
808    /// If value is 0, then no shrinking will occur.
809    pub static UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
810        "upsert_rocksdb_shrink_allocated_buffers_by_ratio",
811        value!(usize; mz_rocksdb_types::defaults::DEFAULT_SHRINK_BUFFERS_BY_RATIO),
812        "The number of times by which allocated buffers will be shrinked in upsert rocksdb.",
813        false,
814    );
815
816    /// Only used if `upsert_rocksdb_write_buffer_manager_memory_bytes` is also set
817    /// and write buffer manager is enabled
818    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION: VarDefinition =
819        VarDefinition::new(
820            "upsert_rocksdb_write_buffer_manager_cluster_memory_fraction",
821            value!(Option<Numeric>; None),
822            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
823        sources. Described in the `mz_rocksdb_types::config` module. \
824        Only takes effect on source restart (Materialize).",
825            false,
826        );
827
828    /// `upsert_rocksdb_write_buffer_manager_memory_bytes` needs to be set for write buffer manager to be
829    /// used.
830    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES: VarDefinition = VarDefinition::new(
831        "upsert_rocksdb_write_buffer_manager_memory_bytes",
832        value!(Option<usize>; None),
833        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
834        sources. Described in the `mz_rocksdb_types::config` module. \
835        Only takes effect on source restart (Materialize).",
836        false,
837    );
838
839    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL: VarDefinition = VarDefinition::new(
840        "upsert_rocksdb_write_buffer_manager_allow_stall",
841        value!(bool; false),
842        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
843        sources. Described in the `mz_rocksdb_types::config` module. \
844        Only takes effect on source restart (Materialize).",
845        false,
846    );
847}
848
849pub static LOGGING_FILTER: VarDefinition = VarDefinition::new_lazy(
850    "log_filter",
851    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
852    "Sets the filter to apply to stderr logging.",
853    false,
854);
855
856pub static OPENTELEMETRY_FILTER: VarDefinition = VarDefinition::new_lazy(
857    "opentelemetry_filter",
858    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
859    "Sets the filter to apply to OpenTelemetry-backed distributed tracing.",
860    false,
861);
862
863pub static LOGGING_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
864    "log_filter_defaults",
865    lazy_value!(Vec<SerializableDirective>; || {
866        mz_ore::tracing::LOGGING_DEFAULTS
867            .iter()
868            .map(|d| d.clone().into())
869            .collect()
870    }),
871    "Sets additional default directives to apply to stderr logging. \
872        These apply to all variations of `log_filter`. Directives other than \
873        `module=off` are likely incorrect.",
874    false,
875);
876
877pub static OPENTELEMETRY_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
878    "opentelemetry_filter_defaults",
879    lazy_value!(Vec<SerializableDirective>; || {
880        mz_ore::tracing::OPENTELEMETRY_DEFAULTS
881            .iter()
882            .map(|d| d.clone().into())
883            .collect()
884    }),
885    "Sets additional default directives to apply to OpenTelemetry-backed \
886        distributed tracing. \
887        These apply to all variations of `opentelemetry_filter`. Directives other than \
888        `module=off` are likely incorrect.",
889    false,
890);
891
892pub static SENTRY_FILTERS: VarDefinition = VarDefinition::new_lazy(
893    "sentry_filters",
894    lazy_value!(Vec<SerializableDirective>; || {
895        mz_ore::tracing::SENTRY_DEFAULTS
896            .iter()
897            .map(|d| d.clone().into())
898            .collect()
899    }),
900    "Sets additional default directives to apply to sentry logging. \
901        These apply on top of a default `info` directive. Directives other than \
902        `module=off` are likely incorrect.",
903    false,
904);
905
906pub static WEBHOOKS_SECRETS_CACHING_TTL_SECS: VarDefinition = VarDefinition::new_lazy(
907    "webhooks_secrets_caching_ttl_secs",
908    lazy_value!(usize; || {
909        usize::cast_from(mz_secrets::cache::DEFAULT_TTL_SECS)
910    }),
911    "Sets the time-to-live for values in the Webhooks secrets cache.",
912    false,
913);
914
915pub static COORD_SLOW_MESSAGE_WARN_THRESHOLD: VarDefinition = VarDefinition::new(
916    "coord_slow_message_warn_threshold",
917    value!(Duration; Duration::from_secs(30)),
918    "Sets the threshold at which we will error! for a coordinator message being slow.",
919    false,
920);
921
922/// Controls the connect_timeout setting when connecting to PG via `mz_postgres_util`.
923pub static PG_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
924    "pg_source_connect_timeout",
925    value!(Duration; DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
926    "Sets the timeout applied to socket-level connection attempts for PG \
927    replication connections (Materialize).",
928    false,
929);
930
931/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection
932/// when connecting to PG via `mz_postgres_util`.
933pub static PG_SOURCE_TCP_KEEPALIVES_RETRIES: VarDefinition = VarDefinition::new(
934    "pg_source_tcp_keepalives_retries",
935    value!(u32; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
936    "Sets the maximum number of TCP keepalive probes that will be sent before dropping \
937    a connection when connecting to PG via `mz_postgres_util` (Materialize).",
938    false,
939);
940
941/// Sets the amount of idle time before a keepalive packet is sent on the connection when connecting
942/// to PG via `mz_postgres_util`.
943pub static PG_SOURCE_TCP_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
944    "pg_source_tcp_keepalives_idle",
945    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
946    "Sets the amount of idle time before a keepalive packet is sent on the connection \
947        when connecting to PG via `mz_postgres_util` (Materialize).",
948    false,
949);
950
951/// Sets the time interval between TCP keepalive probes when connecting to PG via `mz_postgres_util`.
952pub static PG_SOURCE_TCP_KEEPALIVES_INTERVAL: VarDefinition = VarDefinition::new(
953    "pg_source_tcp_keepalives_interval",
954    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
955    "Sets the time interval between TCP keepalive probes when connecting to PG via \
956        replication (Materialize).",
957    false,
958);
959
960/// Sets the TCP user timeout when connecting to PG via `mz_postgres_util`.
961pub static PG_SOURCE_TCP_USER_TIMEOUT: VarDefinition = VarDefinition::new(
962    "pg_source_tcp_user_timeout",
963    value!(Duration; DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
964    "Sets the TCP user timeout when connecting to PG via `mz_postgres_util` (Materialize).",
965    false,
966);
967
968/// Sets whether to apply the TCP configuration parameters on the server when
969/// connecting to PG via `mz_postgres_util`.
970pub static PG_SOURCE_TCP_CONFIGURE_SERVER: VarDefinition = VarDefinition::new(
971    "pg_source_tcp_configure_server",
972    value!(bool; DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER),
973    "Sets whether to apply the TCP configuration parameters on the server when connecting to PG via `mz_postgres_util` (Materialize).",
974    false,
975);
976
977/// Sets the `statement_timeout` value to use during the snapshotting phase of
978/// PG sources.
979pub static PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
980    "pg_source_snapshot_statement_timeout",
981    value!(Duration; mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT),
982    "Sets the `statement_timeout` value to use during the snapshotting phase of PG sources (Materialize)",
983    false,
984);
985
986/// Sets the `wal_sender_timeout` value to use during the replication phase of
987/// PG sources.
988pub static PG_SOURCE_WAL_SENDER_TIMEOUT: VarDefinition = VarDefinition::new(
989    "pg_source_wal_sender_timeout",
990    value!(Option<Duration>; DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT),
991    "Sets the `wal_sender_timeout` value to use during the replication phase of PG sources (Materialize)",
992    false,
993);
994
995/// Please see `PgSourceSnapshotConfig`.
996pub static PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT: VarDefinition = VarDefinition::new(
997    "pg_source_snapshot_collect_strict_count",
998    value!(bool; mz_storage_types::parameters::PgSourceSnapshotConfig::new().collect_strict_count),
999    "Please see <https://dev.materialize.com/api/rust-private\
1000        /mz_storage_types/parameters\
1001        /struct.PgSourceSnapshotConfig.html#structfield.collect_strict_count>",
1002    false,
1003);
1004
1005/// Sets the time between TCP keepalive probes when connecting to MySQL via `mz_mysql_util`.
1006pub static MYSQL_SOURCE_TCP_KEEPALIVE: VarDefinition = VarDefinition::new(
1007    "mysql_source_tcp_keepalive",
1008    value!(Duration; mz_mysql_util::DEFAULT_TCP_KEEPALIVE),
1009    "Sets the time between TCP keepalive probes when connecting to MySQL",
1010    false,
1011);
1012
1013/// Sets the `max_execution_time` value to use during the snapshotting phase of
1014/// MySQL sources.
1015pub static MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME: VarDefinition = VarDefinition::new(
1016    "mysql_source_snapshot_max_execution_time",
1017    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_MAX_EXECUTION_TIME),
1018    "Sets the `max_execution_time` value to use during the snapshotting phase of MySQL sources (Materialize)",
1019    false,
1020);
1021
1022/// Sets the `lock_wait_timeout` value to use during the snapshotting phase of
1023/// MySQL sources.
1024pub static MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT: VarDefinition = VarDefinition::new(
1025    "mysql_source_snapshot_lock_wait_timeout",
1026    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_LOCK_WAIT_TIMEOUT),
1027    "Sets the `lock_wait_timeout` value to use during the snapshotting phase of MySQL sources (Materialize)",
1028    false,
1029);
1030
1031/// Sets the timeout for establishing an authenticated connection to MySQL
1032pub static MYSQL_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1033    "mysql_source_connect_timeout",
1034    value!(Duration; mz_mysql_util::DEFAULT_CONNECT_TIMEOUT),
1035    "Sets the timeout for establishing an authenticated connection to MySQL",
1036    false,
1037);
1038
1039/// Controls the check interval for connections to SSH bastions via `mz_ssh_util`.
1040pub static SSH_CHECK_INTERVAL: VarDefinition = VarDefinition::new(
1041    "ssh_check_interval",
1042    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CHECK_INTERVAL),
1043    "Controls the check interval for connections to SSH bastions via `mz_ssh_util`.",
1044    false,
1045);
1046
1047/// Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.
1048pub static SSH_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1049    "ssh_connect_timeout",
1050    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CONNECT_TIMEOUT),
1051    "Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.",
1052    false,
1053);
1054
1055/// Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.
1056pub static SSH_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
1057    "ssh_keepalives_idle",
1058    value!(Duration; mz_ssh_util::tunnel::DEFAULT_KEEPALIVES_IDLE),
1059    "Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.",
1060    false,
1061);
1062
1063/// Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.
1064pub static KAFKA_SOCKET_KEEPALIVE: VarDefinition = VarDefinition::new(
1065    "kafka_socket_keepalive",
1066    value!(bool; mz_kafka_util::client::DEFAULT_KEEPALIVE),
1067    "Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.",
1068    false,
1069);
1070
1071/// Controls `socket.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1072/// (60000ms). Cannot be greater than 300000ms, more than 100ms greater than
1073/// `kafka_transaction_timeout`, or less than 10ms.
1074pub static KAFKA_SOCKET_TIMEOUT: VarDefinition = VarDefinition::new(
1075    "kafka_socket_timeout",
1076    value!(Option<Duration>; None),
1077    "Controls `socket.timeout.ms` for rdkafka \
1078        client connections. Defaults to the rdkafka default (60000ms) or \
1079        the set transaction timeout + 100ms, whichever one is smaller. \
1080        Cannot be greater than 300000ms, more than 100ms greater than \
1081        `kafka_transaction_timeout`, or less than 10ms.",
1082    false,
1083);
1084
1085/// Controls `transaction.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1086/// (60000ms). Cannot be greater than `i32::MAX` or less than 1000ms.
1087pub static KAFKA_TRANSACTION_TIMEOUT: VarDefinition = VarDefinition::new(
1088    "kafka_transaction_timeout",
1089    value!(Duration; mz_kafka_util::client::DEFAULT_TRANSACTION_TIMEOUT),
1090    "Controls `transaction.timeout.ms` for rdkafka \
1091        client connections. Defaults to the 10min. \
1092        Cannot be greater than `i32::MAX` or less than 1000ms.",
1093    false,
1094);
1095
1096/// Controls `socket.connection.setup.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1097/// (30000ms). Cannot be greater than `i32::MAX` or less than 1000ms
1098pub static KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT: VarDefinition = VarDefinition::new(
1099    "kafka_socket_connection_setup_timeout",
1100    value!(Duration; mz_kafka_util::client::DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT),
1101    "Controls `socket.connection.setup.timeout.ms` for rdkafka \
1102        client connections. Defaults to the rdkafka default (30000ms). \
1103        Cannot be greater than `i32::MAX` or less than 1000ms",
1104    false,
1105);
1106
1107/// Controls the timeout when fetching kafka metadata. Defaults to 10s.
1108pub static KAFKA_FETCH_METADATA_TIMEOUT: VarDefinition = VarDefinition::new(
1109    "kafka_fetch_metadata_timeout",
1110    value!(Duration; mz_kafka_util::client::DEFAULT_FETCH_METADATA_TIMEOUT),
1111    "Controls the timeout when fetching kafka metadata. \
1112        Defaults to 10s.",
1113    false,
1114);
1115
1116/// Controls the timeout when fetching kafka progress records. Defaults to 60s.
1117pub static KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT: VarDefinition = VarDefinition::new(
1118    "kafka_progress_record_fetch_timeout",
1119    value!(Option<Duration>; None),
1120    "Controls the timeout when fetching kafka progress records. \
1121        Defaults to 60s or the transaction timeout, whichever one is larger.",
1122    false,
1123);
1124
1125/// The maximum number of in-flight bytes emitted by persist_sources feeding _storage
1126/// dataflows_.
1127/// Currently defaults to 256MiB = 268435456 bytes
1128/// Note: Backpressure will only be turned on if disk is enabled based on
1129/// `storage_dataflow_max_inflight_bytes_disk_only` flag
1130pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES: VarDefinition = VarDefinition::new(
1131    "storage_dataflow_max_inflight_bytes",
1132    value!(Option<usize>; Some(256 * 1024 * 1024)),
1133    "The maximum number of in-flight bytes emitted by persist_sources feeding \
1134        storage dataflows. Defaults to backpressure enabled (Materialize).",
1135    false,
1136);
1137
1138/// Configuration ratio to shrink unusef buffers in upsert by.
1139/// For eg: is 2 is set, then the buffers will be reduced by 2 i.e. halved.
1140/// Default is 0, which means shrinking is disabled.
1141pub static STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
1142    "storage_shrink_upsert_unused_buffers_by_ratio",
1143    value!(usize; 0),
1144    "Configuration ratio to shrink unusef buffers in upsert by",
1145    false,
1146);
1147
1148/// The fraction of the cluster replica size to be used as the maximum number of
1149/// in-flight bytes emitted by persist_sources feeding storage dataflows.
1150/// If not configured, the storage_dataflow_max_inflight_bytes value will be used.
1151/// For this value to be used storage_dataflow_max_inflight_bytes needs to be set.
1152pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION: VarDefinition =
1153    VarDefinition::new_lazy(
1154        "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction",
1155        lazy_value!(Option<Numeric>; || Some(0.01.into())),
1156        "The fraction of the cluster replica size to be used as the maximum number of \
1157            in-flight bytes emitted by persist_sources feeding storage dataflows. \
1158            If not configured, the storage_dataflow_max_inflight_bytes value will be used.",
1159        false,
1160    );
1161
1162pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY: VarDefinition = VarDefinition::new(
1163    "storage_dataflow_max_inflight_bytes_disk_only",
1164    value!(bool; true),
1165    "Whether or not `storage_dataflow_max_inflight_bytes` applies only to \
1166        upsert dataflows using disks. Defaults to true (Materialize).",
1167    false,
1168);
1169
1170/// The interval to submit statistics to `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker`.
1171pub static STORAGE_STATISTICS_INTERVAL: VarDefinition = VarDefinition::new(
1172    "storage_statistics_interval",
1173    value!(Duration; mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT),
1174    "The interval to submit statistics to `mz_source_statistics_per_worker` \
1175        and `mz_sink_statistics` (Materialize).",
1176    false,
1177);
1178
1179/// The interval to collect statistics for `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker` in
1180/// clusterd. Controls the accuracy of metrics.
1181pub static STORAGE_STATISTICS_COLLECTION_INTERVAL: VarDefinition = VarDefinition::new(
1182    "storage_statistics_collection_interval",
1183    value!(Duration; mz_storage_types::parameters::STATISTICS_COLLECTION_INTERVAL_DEFAULT),
1184    "The interval to collect statistics for `mz_source_statistics_per_worker` \
1185        and `mz_sink_statistics_per_worker` in clusterd. Controls the accuracy of metrics \
1186        (Materialize).",
1187    false,
1188);
1189
1190pub static STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: VarDefinition = VarDefinition::new(
1191    "storage_record_source_sink_namespaced_errors",
1192    value!(bool; true),
1193    "Whether or not to record namespaced errors in the status history tables",
1194    false,
1195);
1196
1197/// Boolean flag indicating whether to enable syncing from
1198/// LaunchDarkly. Can be turned off as an emergency measure to still
1199/// be able to alter parameters while LD is broken.
1200pub static ENABLE_LAUNCHDARKLY: VarDefinition = VarDefinition::new(
1201    "enable_launchdarkly",
1202    value!(bool; true),
1203    "Boolean flag indicating whether flag synchronization from LaunchDarkly should be enabled (Materialize).",
1204    false,
1205);
1206
1207/// Feature flag indicating whether real time recency is enabled. Not that
1208/// unlike other feature flags, this is made available at the session level, so
1209/// is additionally gated by a feature flag.
1210pub static REAL_TIME_RECENCY: VarDefinition = VarDefinition::new(
1211    "real_time_recency",
1212    value!(bool; false),
1213    "Feature flag indicating whether real time recency is enabled (Materialize).",
1214    true,
1215)
1216.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1217
1218pub static REAL_TIME_RECENCY_TIMEOUT: VarDefinition = VarDefinition::new(
1219    "real_time_recency_timeout",
1220    value!(Duration; Duration::from_secs(10)),
1221    "Sets the maximum allowed duration of SELECTs that actively use real-time \
1222    recency, i.e. reach out to an external system to determine their most recencly exposed \
1223    data (Materialize).",
1224    true,
1225)
1226.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1227
1228pub static EMIT_PLAN_INSIGHTS_NOTICE: VarDefinition = VarDefinition::new(
1229    "emit_plan_insights_notice",
1230    value!(bool; false),
1231    "Boolean flag indicating whether to send a NOTICE with JSON-formatted plan insights before executing a SELECT statement (Materialize).",
1232    true,
1233);
1234
1235pub static EMIT_TIMESTAMP_NOTICE: VarDefinition = VarDefinition::new(
1236    "emit_timestamp_notice",
1237    value!(bool; false),
1238    "Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize).",
1239    true,
1240);
1241
1242pub static EMIT_TRACE_ID_NOTICE: VarDefinition = VarDefinition::new(
1243    "emit_trace_id_notice",
1244    value!(bool; false),
1245    "Boolean flag indicating whether to send a NOTICE specifying the trace id when available (Materialize).",
1246    true,
1247);
1248
1249pub static UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP: VarDefinition = VarDefinition::new(
1250    "unsafe_mock_audit_event_timestamp",
1251    value!(Option<mz_repr::Timestamp>; None),
1252    "Mocked timestamp to use for audit events for testing purposes",
1253    false,
1254);
1255
1256pub static ENABLE_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1257    "enable_rbac_checks",
1258    value!(bool; true),
1259    "User facing global boolean flag indicating whether to apply RBAC checks before \
1260        executing statements (Materialize).",
1261    true,
1262);
1263
1264pub static ENABLE_SESSION_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1265    "enable_session_rbac_checks",
1266    // TODO(jkosh44) Once RBAC is enabled in all environments, change this to `true`.
1267    value!(bool; false),
1268    "User facing session boolean flag indicating whether to apply RBAC checks before \
1269        executing statements (Materialize).",
1270    true,
1271);
1272
1273pub static RESTRICT_TO_USER_OBJECTS: VarDefinition = VarDefinition::new(
1274    "restrict_to_user_objects",
1275    value!(bool; false),
1276    "When enabled, queries are restricted from accessing system catalog objects. \
1277        Useful for MCP tool queries that should only access user-created data products.",
1278    true,
1279);
1280
1281pub static EMIT_INTROSPECTION_QUERY_NOTICE: VarDefinition = VarDefinition::new(
1282    "emit_introspection_query_notice",
1283    value!(bool; true),
1284    "Whether to print a notice when querying per-replica introspection sources.",
1285    true,
1286);
1287
1288// TODO(mgree) change this to a SelectOption
1289pub static ENABLE_SESSION_CARDINALITY_ESTIMATES: VarDefinition = VarDefinition::new(
1290    "enable_session_cardinality_estimates",
1291    value!(bool; false),
1292    "Feature flag indicating whether to use cardinality estimates when optimizing queries; \
1293        does not affect EXPLAIN WITH(cardinality) (Materialize).",
1294    true,
1295)
1296.with_feature_flag(&ENABLE_CARDINALITY_ESTIMATES);
1297
1298pub static OPTIMIZER_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1299    "optimizer_stats_timeout",
1300    value!(Duration; Duration::from_millis(250)),
1301    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1302        applied to non-oneshot, i.e., long-lasting queries, like CREATE MATERIALIZED VIEW (Materialize).",
1303    false,
1304);
1305
1306pub static OPTIMIZER_ONESHOT_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1307    "optimizer_oneshot_stats_timeout",
1308    value!(Duration; Duration::from_millis(10)),
1309    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1310        applied to oneshot queries, like SELECT (Materialize).",
1311    false,
1312);
1313
1314pub static PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE: VarDefinition = VarDefinition::new(
1315    "privatelink_status_update_quota_per_minute",
1316    value!(u32; 20),
1317    "Sets the per-minute quota for privatelink vpc status updates to be written to \
1318        the storage-collection-backed system table. This value implies the total and burst quota per-minute.",
1319    false,
1320);
1321
1322pub static STATEMENT_LOGGING_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1323    "statement_logging_sample_rate",
1324    lazy_value!(Numeric; || 0.1.into()),
1325    "User-facing session variable indicating how many statement executions should be \
1326        logged, subject to constraint by the system variable `statement_logging_max_sample_rate` (Materialize).",
1327    true,
1328).with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1329
1330pub static ENABLE_DEFAULT_CONNECTION_VALIDATION: VarDefinition = VarDefinition::new(
1331    "enable_default_connection_validation",
1332    value!(bool; true),
1333    "LD facing global boolean flag that allows turning default connection validation off for everyone (Materialize).",
1334    false,
1335);
1336
1337pub static STATEMENT_LOGGING_MAX_DATA_CREDIT: VarDefinition = VarDefinition::new(
1338    "statement_logging_max_data_credit",
1339    value!(Option<usize>; Some(50 * 1024 * 1024)),
1340    // The idea is that during periods of low logging, tokens can accumulate up to this value,
1341    // and then be depleted during periods of high logging.
1342    "The maximum number of bytes that can be logged for statement logging in short burts, or NULL if unlimited (Materialize).",
1343    false,
1344);
1345
1346pub static STATEMENT_LOGGING_TARGET_DATA_RATE: VarDefinition = VarDefinition::new(
1347    "statement_logging_target_data_rate",
1348    value!(Option<usize>; Some(2071)),
1349    "The maximum sustained data rate of statement logging, in bytes per second, or NULL if unlimited (Materialize).",
1350    false,
1351);
1352
1353pub static STATEMENT_LOGGING_MAX_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1354    "statement_logging_max_sample_rate",
1355    lazy_value!(Numeric; || 0.99.into()),
1356    "The maximum rate at which statements may be logged. If this value is less than \
1357        that of `statement_logging_sample_rate`, the latter is ignored (Materialize).",
1358    true,
1359)
1360.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1361
1362pub static STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1363    "statement_logging_default_sample_rate",
1364    lazy_value!(Numeric; || 0.99.into()),
1365    "The default value of `statement_logging_sample_rate` for new sessions (Materialize).",
1366    true,
1367)
1368.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1369
1370pub static ENABLE_INTERNAL_STATEMENT_LOGGING: VarDefinition = VarDefinition::new(
1371    "enable_internal_statement_logging",
1372    value!(bool; false),
1373    "Whether to log statements from the `mz_system` user.",
1374    false,
1375);
1376
1377pub static AUTO_ROUTE_CATALOG_QUERIES: VarDefinition = VarDefinition::new(
1378    "auto_route_catalog_queries",
1379    value!(bool; true),
1380    "Whether to force queries that depend only on system tables, to run on the mz_catalog_server cluster (Materialize).",
1381    true,
1382);
1383
1384pub static MAX_CONNECTIONS: VarDefinition = VarDefinition::new(
1385    "max_connections",
1386    value!(u32; 5000),
1387    "The maximum number of concurrent connections (PostgreSQL).",
1388    true,
1389);
1390
1391pub static SUPERUSER_RESERVED_CONNECTIONS: VarDefinition = VarDefinition::new(
1392    "superuser_reserved_connections",
1393    value!(u32; 3),
1394    "The number of connections that are reserved for superusers (PostgreSQL).",
1395    true,
1396);
1397
1398/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_source_status_history_entries`].
1399pub static KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1400    "keep_n_source_status_history_entries",
1401    value!(usize; 5),
1402    "On reboot, truncate all but the last n entries per ID in the source_status_history collection (Materialize).",
1403    false,
1404);
1405
1406/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_sink_status_history_entries`].
1407pub static KEEP_N_SINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1408    "keep_n_sink_status_history_entries",
1409    value!(usize; 5),
1410    "On reboot, truncate all but the last n entries per ID in the sink_status_history collection (Materialize).",
1411    false,
1412);
1413
1414/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_privatelink_status_history_entries`].
1415pub static KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1416    "keep_n_privatelink_status_history_entries",
1417    value!(usize; 5),
1418    "On reboot, truncate all but the last n entries per ID in the mz_aws_privatelink_connection_status_history \
1419        collection (Materialize).",
1420    false,
1421);
1422
1423/// Controls [`mz_storage_types::parameters::StorageParameters::replica_status_history_retention_window`].
1424pub static REPLICA_STATUS_HISTORY_RETENTION_WINDOW: VarDefinition = VarDefinition::new(
1425    "replica_status_history_retention_window",
1426    value!(Duration; REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT),
1427    "On reboot, truncate up all entries past the retention window in the mz_cluster_replica_status_history \
1428        collection (Materialize).",
1429    false,
1430);
1431
1432pub static ENABLE_STORAGE_SHARD_FINALIZATION: VarDefinition = VarDefinition::new(
1433    "enable_storage_shard_finalization",
1434    value!(bool; true),
1435    "Whether to allow the storage client to finalize shards (Materialize).",
1436    false,
1437);
1438
1439pub static ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE: VarDefinition = VarDefinition::new(
1440    "enable_consolidate_after_union_negate",
1441    value!(bool; true),
1442    "consolidation after Unions that have a Negated input (Materialize).",
1443    true,
1444);
1445
1446pub static DEFAULT_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1447    "default_timestamp_interval",
1448    value!(Duration; Duration::from_millis(1000)),
1449    "The interval at which timestamps are assigned to data from sources and tables.",
1450    false,
1451);
1452
1453pub static MIN_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1454    "min_timestamp_interval",
1455    value!(Duration; Duration::from_millis(1000)),
1456    "Minimum timestamp interval",
1457    false,
1458);
1459
1460pub static MAX_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1461    "max_timestamp_interval",
1462    value!(Duration; Duration::from_millis(1000)),
1463    "Maximum timestamp interval",
1464    false,
1465);
1466
1467pub static WEBHOOK_CONCURRENT_REQUEST_LIMIT: VarDefinition = VarDefinition::new(
1468    "webhook_concurrent_request_limit",
1469    value!(usize; WEBHOOK_CONCURRENCY_LIMIT),
1470    "Maximum number of concurrent requests for appending to a webhook source.",
1471    false,
1472);
1473
1474pub static USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION: VarDefinition = VarDefinition::new(
1475    "user_storage_managed_collections_batch_duration",
1476    value!(Duration; STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT),
1477    "Duration which we'll wait to collect a batch of events for a webhook source.",
1478    false,
1479);
1480
1481// This system var will need to point to the name of an existing network policy
1482// this will be enforced on alter_system_set
1483pub static NETWORK_POLICY: VarDefinition = VarDefinition::new_lazy(
1484    "network_policy",
1485    lazy_value!(String; || "default".to_string()),
1486    "Sets the fallback network policy applied to all users without an explicit policy.",
1487    true,
1488);
1489
1490pub static FORCE_SOURCE_TABLE_SYNTAX: VarDefinition = VarDefinition::new(
1491    "force_source_table_syntax",
1492    value!(bool; false),
1493    "Force use of new source model (CREATE TABLE .. FROM SOURCE) and migrate existing sources",
1494    true,
1495);
1496
1497pub static OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD: VarDefinition = VarDefinition::new(
1498    "optimizer_e2e_latency_warning_threshold",
1499    value!(Duration; Duration::from_millis(500)),
1500    "Sets the duration that a query can take to compile; queries that take longer \
1501        will trigger a warning. If this value is specified without units, it is taken as \
1502        milliseconds. A value of zero disables the timeout (Materialize).",
1503    true,
1504);
1505
1506/// Configuration for gRPC client connections.
1507pub mod grpc_client {
1508    use super::*;
1509
1510    pub static CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1511        "grpc_client_connect_timeout",
1512        value!(Duration; Duration::from_secs(5)),
1513        "Timeout to apply to initial gRPC client connection establishment.",
1514        false,
1515    );
1516
1517    pub static HTTP2_KEEP_ALIVE_INTERVAL: VarDefinition = VarDefinition::new(
1518        "grpc_client_http2_keep_alive_interval",
1519        value!(Duration; Duration::from_secs(3)),
1520        "Idle time to wait before sending HTTP/2 PINGs to maintain established gRPC client connections.",
1521        false,
1522    );
1523
1524    pub static HTTP2_KEEP_ALIVE_TIMEOUT: VarDefinition = VarDefinition::new(
1525        "grpc_client_http2_keep_alive_timeout",
1526        value!(Duration; Duration::from_secs(60)),
1527        "Time to wait for HTTP/2 pong response before terminating a gRPC client connection.",
1528        false,
1529    );
1530}
1531
1532/// Configuration for how cluster replicas are scheduled.
1533pub mod cluster_scheduling {
1534    use super::*;
1535    use mz_orchestrator::scheduling_config::*;
1536
1537    pub static CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1538        "cluster_multi_process_replica_az_affinity_weight",
1539        value!(Option<i32>; DEFAULT_POD_AZ_AFFINITY_WEIGHT),
1540        "Whether or not to add an availability zone affinity between instances of \
1541            multi-process replicas. Either an affinity weight or empty (off) (Materialize).",
1542        false,
1543    );
1544
1545    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY: VarDefinition = VarDefinition::new(
1546        "cluster_soften_replication_anti_affinity",
1547        value!(bool; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY),
1548        "Whether or not to turn the node-scope anti affinity between replicas \
1549            in the same cluster into a preference (Materialize).",
1550        false,
1551    );
1552
1553    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1554        "cluster_soften_replication_anti_affinity_weight",
1555        value!(i32; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT),
1556        "The preference weight for `cluster_soften_replication_anti_affinity` (Materialize).",
1557        false,
1558    );
1559
1560    pub static CLUSTER_ENABLE_TOPOLOGY_SPREAD: VarDefinition = VarDefinition::new(
1561        "cluster_enable_topology_spread",
1562        value!(bool; DEFAULT_TOPOLOGY_SPREAD_ENABLED),
1563        "Whether or not to add topology spread constraints among replicas in the same cluster (Materialize).",
1564        false,
1565    );
1566
1567    pub static CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: VarDefinition =
1568        VarDefinition::new(
1569            "cluster_topology_spread_ignore_non_singular_scale",
1570            value!(bool; DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE),
1571            "If true, ignore replicas with more than 1 process when adding topology spread constraints (Materialize).",
1572            false,
1573        );
1574
1575    pub static CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW: VarDefinition = VarDefinition::new(
1576        "cluster_topology_spread_max_skew",
1577        value!(i32; DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW),
1578        "The `maxSkew` for replica topology spread constraints (Materialize).",
1579        false,
1580    );
1581
1582    // `minDomains`, like maxSkew, is used to spread across a topology
1583    // key. Unlike max skew, minDomains will force node creation to ensure
1584    // distribution across a minimum number of keys.
1585    // https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#spread-constraint-definition
1586    pub static CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS: VarDefinition = VarDefinition::new(
1587        "cluster_topology_spread_min_domains",
1588        value!(Option<i32>; None),
1589        "`minDomains` for replica topology spread constraints. \
1590            Should be set to the number of Availability Zones (Materialize).",
1591        false,
1592    );
1593
1594    pub static CLUSTER_TOPOLOGY_SPREAD_SOFT: VarDefinition = VarDefinition::new(
1595        "cluster_topology_spread_soft",
1596        value!(bool; DEFAULT_TOPOLOGY_SPREAD_SOFT),
1597        "If true, soften the topology spread constraints for replicas (Materialize).",
1598        false,
1599    );
1600
1601    pub static CLUSTER_SOFTEN_AZ_AFFINITY: VarDefinition = VarDefinition::new(
1602        "cluster_soften_az_affinity",
1603        value!(bool; DEFAULT_SOFTEN_AZ_AFFINITY),
1604        "Whether or not to turn the az-scope node affinity for replicas. \
1605            Note this could violate requests from the user (Materialize).",
1606        false,
1607    );
1608
1609    pub static CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1610        "cluster_soften_az_affinity_weight",
1611        value!(i32; DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT),
1612        "The preference weight for `cluster_soften_az_affinity` (Materialize).",
1613        false,
1614    );
1615
1616    const DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL: Duration = Duration::from_secs(3);
1617
1618    pub static CLUSTER_ALTER_CHECK_READY_INTERVAL: VarDefinition = VarDefinition::new(
1619        "cluster_alter_check_ready_interval",
1620        value!(Duration; DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL),
1621        "How often to poll readiness checks for cluster alter",
1622        false,
1623    );
1624
1625    const DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL: Duration = Duration::from_secs(3);
1626
1627    pub static CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL: VarDefinition = VarDefinition::new(
1628        "cluster_check_scheduling_policies_interval",
1629        value!(Duration; DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL),
1630        "How often policies are invoked to automatically start/stop clusters, e.g., \
1631            for REFRESH EVERY materialized views.",
1632        false,
1633    );
1634
1635    pub static CLUSTER_SECURITY_CONTEXT_ENABLED: VarDefinition = VarDefinition::new(
1636        "cluster_security_context_enabled",
1637        value!(bool; DEFAULT_SECURITY_CONTEXT_ENABLED),
1638        "Enables SecurityContext for clusterd instances, restricting capabilities to improve security.",
1639        false,
1640    );
1641
1642    const DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: Duration = Duration::from_secs(1200);
1643
1644    pub static CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: VarDefinition = VarDefinition::new(
1645        "cluster_refresh_mv_compaction_estimate",
1646        value!(Duration; DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE),
1647        "How much time to wait for compaction after a REFRESH MV completes a refresh \
1648            before turning off the refresh cluster. This is needed because Persist does compaction \
1649            only after a write, but refresh MVs do writes only at their refresh times. \
1650            (In the long term, we'd like to remove this configuration and instead wait exactly \
1651            until compaction has settled. We'd need some new Persist API for this.)",
1652        false,
1653    );
1654}
1655
1656/// Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the
1657/// availability of features.
1658///
1659/// The arguments to `feature_flags!` are:
1660/// - `$name`, which will be the name of the feature flag, in snake_case
1661/// - `$feature_desc`, a human-readable description of the feature
1662/// - `$value`, which if not provided, defaults to `false`
1663///
1664/// Note that not all `VarDefinition<bool>` are feature flags. Feature flags are for variables that:
1665/// - Belong to `SystemVars`, _not_ `SessionVars`
1666/// - Default to false and must be explicitly enabled, or default to `true` and can be explicitly disabled.
1667///
1668/// WARNING / CONTRACT: Syntax-related feature flags must always *enable* behavior. In other words,
1669/// setting a feature flag must make the system more permissive. For example, let's suppose we'd like
1670/// to gate deprecated upsert syntax behind a feature flag. In this case, do not add a feature flag
1671/// like `disable_deprecated_upsert_syntax`, as `disable_deprecated_upsert_syntax = on` would
1672/// _prevent_ the system from parsing the deprecated upsert syntax. Instead, use a feature flag
1673/// like `enable_deprecated_upsert_syntax`.
1674///
1675/// The hazard this protects against is related to reboots after feature flags have been disabled.
1676/// Say someone creates a Kinesis source while `enable_kinesis_sources = on`. Materialize will
1677/// commit this source to the system catalog. Then, suppose we discover a catastrophic bug in
1678/// Kinesis sources and set `enable_kinesis_sources` to `off`. This prevents users from creating
1679/// new Kinesis sources, but leaves the existing Kinesis sources in place. This is because
1680/// disabling a feature flag doesn't remove access to catalog objects created while the feature
1681/// flag was live. On the next reboot, Materialize will proceed to load the Kinesis source from the
1682/// catalog, reparsing and replanning the `CREATE SOURCE` definition and rechecking the
1683/// `enable_kinesis_sources` feature flag along the way. Even though the feature flag has been
1684/// switched to `off`, we need to temporarily re-enable it during parsing and planning to be able
1685/// to boot successfully.
1686///
1687/// Ensuring that all syntax-related feature flags *enable* behavior means that setting all such
1688/// feature flags to `on` during catalog boot has the desired effect.
1689macro_rules! feature_flags {
1690    // Match `$name, $feature_desc, $value`.
1691    (@inner
1692        // The feature flag name.
1693        name: $name:expr,
1694        // The feature flag description.
1695        desc: $desc:literal,
1696        // The feature flag default value.
1697        default: $value:expr,
1698    ) => {
1699        paste::paste!{
1700            // Note that the ServerVar is not directly exported; we expect these to be
1701            // accessible through their FeatureFlag variant.
1702            static [<$name:upper _VAR>]: VarDefinition = VarDefinition::new(
1703                stringify!($name),
1704                value!(bool; $value),
1705                concat!("Whether ", $desc, " is allowed (Materialize)."),
1706                false,
1707            );
1708
1709            pub static [<$name:upper >]: FeatureFlag = FeatureFlag {
1710                flag: &[<$name:upper _VAR>],
1711                feature_desc: $desc,
1712            };
1713        }
1714    };
1715    ($({
1716        // The feature flag name.
1717        name: $name:expr,
1718        // The feature flag description.
1719        desc: $desc:literal,
1720        // The feature flag default value.
1721        default: $value:expr,
1722        // Should the feature be turned on during catalog rehydration when
1723        // parsing a catalog item.
1724        enable_for_item_parsing: $enable_for_item_parsing:expr,
1725    },)+) => {
1726        $(feature_flags! { @inner
1727            name: $name,
1728            desc: $desc,
1729            default: $value,
1730        })+
1731
1732        paste::paste!{
1733            pub static FEATURE_FLAGS: &'static [&'static VarDefinition] = &[
1734                $(  & [<$name:upper _VAR>] , )+
1735            ];
1736        }
1737
1738        paste::paste!{
1739            impl super::SystemVars {
1740                pub fn enable_all_feature_flags_by_default(&mut self) {
1741                    $(
1742                        self.set_default(stringify!($name), super::VarInput::Flat("on"))
1743                            .expect("setting default value must work");
1744                    )+
1745                }
1746
1747                pub fn enable_for_item_parsing(&mut self) {
1748                    $(
1749                        if $enable_for_item_parsing {
1750                            self.set(stringify!($name), super::VarInput::Flat("on"))
1751                                .expect("setting default value must work");
1752                        }
1753                    )+
1754                }
1755
1756                $(
1757                    pub fn [<$name:lower>](&self) -> bool {
1758                        *self.expect_value(&[<$name:upper _VAR>])
1759                    }
1760                )+
1761            }
1762        }
1763    }
1764}
1765
1766feature_flags!(
1767    // Gates for other feature flags
1768    {
1769        name: allow_real_time_recency,
1770        desc: "real time recency",
1771        default: false,
1772        enable_for_item_parsing: true,
1773    },
1774    // Actual feature flags
1775    {
1776        name: enable_binary_date_bin,
1777        desc: "the binary version of date_bin function",
1778        default: false,
1779        enable_for_item_parsing: true,
1780    },
1781    {
1782        name: enable_date_bin_hopping,
1783        desc: "the date_bin_hopping function",
1784        default: false,
1785        enable_for_item_parsing: true,
1786    },
1787    {
1788        name: enable_envelope_debezium_in_subscribe,
1789        desc: "`ENVELOPE DEBEZIUM (KEY (..))`",
1790        default: false,
1791        enable_for_item_parsing: true,
1792    },
1793    {
1794        name: enable_envelope_materialize,
1795        desc: "ENVELOPE MATERIALIZE",
1796        default: false,
1797        enable_for_item_parsing: true,
1798    },
1799    {
1800        name: enable_explain_pushdown,
1801        desc: "EXPLAIN FILTER PUSHDOWN",
1802        default: true,
1803        enable_for_item_parsing: true,
1804    },
1805    {
1806        name: enable_index_options,
1807        desc: "INDEX OPTIONS",
1808        default: false,
1809        enable_for_item_parsing: true,
1810    },
1811    {
1812        name: enable_list_length_max,
1813        desc: "the list_length_max function",
1814        default: false,
1815        enable_for_item_parsing: true,
1816    },
1817    {
1818        name: enable_list_n_layers,
1819        desc: "the list_n_layers function",
1820        default: false,
1821        enable_for_item_parsing: true,
1822    },
1823    {
1824        name: enable_list_remove,
1825        desc: "the list_remove function",
1826        default: false,
1827        enable_for_item_parsing: true,
1828    },
1829    {
1830
1831        name: enable_logical_compaction_window,
1832        desc: "RETAIN HISTORY",
1833        default: false,
1834        enable_for_item_parsing: true,
1835    },
1836    {
1837        name: enable_primary_key_not_enforced,
1838        desc: "PRIMARY KEY NOT ENFORCED",
1839        default: false,
1840        enable_for_item_parsing: true,
1841    },
1842    {
1843        name: enable_collection_partition_by,
1844        desc: "PARTITION BY",
1845        default: true,
1846        enable_for_item_parsing: true,
1847    },
1848    {
1849        name: enable_multi_worker_storage_persist_sink,
1850        desc: "multi-worker storage persist sink",
1851        default: true,
1852        enable_for_item_parsing: true,
1853    },
1854    {
1855        name: enable_persist_streaming_snapshot_and_fetch,
1856        desc: "use the new streaming consolidate for snapshot_and_fetch",
1857        default: false,
1858        enable_for_item_parsing: true,
1859    },
1860    {
1861        name: enable_persist_streaming_compaction,
1862        desc: "use the new streaming consolidate for compaction",
1863        default: false,
1864        enable_for_item_parsing: true,
1865    },
1866    {
1867        name: enable_raise_statement,
1868        desc: "RAISE statement",
1869        default: false,
1870        enable_for_item_parsing: true,
1871    },
1872    {
1873        name: enable_repeat_row,
1874        desc: "the repeat_row function",
1875        default: false,
1876        enable_for_item_parsing: true,
1877    },
1878    {
1879        name: enable_repeat_row_non_negative,
1880        desc: "the repeat_row_non_negative function",
1881        default: false,
1882        enable_for_item_parsing: true,
1883    },
1884    {
1885        name: enable_replica_targeted_materialized_views,
1886        desc: "replica-targeted materialized views",
1887        default: false,
1888        enable_for_item_parsing: true,
1889    },
1890    {
1891        name: unsafe_enable_table_check_constraint,
1892        desc: "CREATE TABLE with a check constraint",
1893        default: false,
1894        enable_for_item_parsing: true,
1895    },
1896    {
1897        name: unsafe_enable_table_foreign_key,
1898        desc: "CREATE TABLE with a foreign key",
1899        default: false,
1900        enable_for_item_parsing: true,
1901    },
1902    {
1903        name: unsafe_enable_table_keys,
1904        desc: "CREATE TABLE with a primary key or unique constraint",
1905        default: false,
1906        enable_for_item_parsing: true,
1907    },
1908    {
1909        name: unsafe_enable_unorchestrated_cluster_replicas,
1910        desc: "unorchestrated cluster replicas",
1911        default: false,
1912        enable_for_item_parsing: true,
1913    },
1914    {
1915        name: unsafe_enable_unstable_dependencies,
1916        desc: "depending on unstable objects",
1917        default: false,
1918        enable_for_item_parsing: true,
1919    },
1920    {
1921        name: enable_within_timestamp_order_by_in_subscribe,
1922        desc: "`WITHIN TIMESTAMP ORDER BY ..`",
1923        default: false,
1924        enable_for_item_parsing: true,
1925    },
1926    {
1927        name: enable_cardinality_estimates,
1928        desc: "join planning with cardinality estimates",
1929        default: false,
1930        enable_for_item_parsing: false,
1931    },
1932    {
1933        name: enable_connection_validation_syntax,
1934        desc: "CREATE CONNECTION .. WITH (VALIDATE) and VALIDATE CONNECTION syntax",
1935        default: true,
1936        enable_for_item_parsing: true,
1937    },
1938    {
1939        name: enable_kafka_broker_matching_rules,
1940        desc: "MATCHING broker rules in BROKERS for Kafka PrivateLink connections",
1941        default: false,
1942        enable_for_item_parsing: true,
1943    },
1944    {
1945        name: enable_alter_set_cluster,
1946        desc: "ALTER ... SET CLUSTER syntax",
1947        default: false,
1948        enable_for_item_parsing: true,
1949    },
1950    {
1951        name: unsafe_enable_unsafe_functions,
1952        desc: "executing potentially dangerous functions",
1953        default: false,
1954        enable_for_item_parsing: true,
1955    },
1956    {
1957        name: enable_managed_cluster_availability_zones,
1958        desc: "MANAGED, AVAILABILITY ZONES syntax",
1959        default: false,
1960        enable_for_item_parsing: true,
1961    },
1962    {
1963        name: statement_logging_use_reproducible_rng,
1964        desc: "statement logging with reproducible RNG",
1965        default: false,
1966        enable_for_item_parsing: false,
1967    },
1968    {
1969        name: enable_notices_for_index_already_exists,
1970        desc: "emitting notices for IndexAlreadyExists (doesn't affect EXPLAIN)",
1971        default: true,
1972        enable_for_item_parsing: true,
1973    },
1974    {
1975        name: enable_notices_for_index_too_wide_for_literal_constraints,
1976        desc: "emitting notices for IndexTooWideForLiteralConstraints (doesn't affect EXPLAIN)",
1977        default: false,
1978        enable_for_item_parsing: true,
1979    },
1980    {
1981        name: enable_notices_for_index_empty_key,
1982        desc: "emitting notices for indexes with an empty key (doesn't affect EXPLAIN)",
1983        default: true,
1984        enable_for_item_parsing: true,
1985    },
1986    {
1987        name: enable_notices_for_equals_null,
1988        desc: "emitting notices for `= NULL` and `<> NULL` comparisons (doesn't affect EXPLAIN)",
1989        default: true,
1990        enable_for_item_parsing: true,
1991    },
1992    {
1993        name: enable_alter_swap,
1994        desc: "the ALTER SWAP feature for objects",
1995        default: true,
1996        enable_for_item_parsing: true,
1997    },
1998    {
1999        name: enable_new_outer_join_lowering,
2000        desc: "new outer join lowering",
2001        default: true,
2002        enable_for_item_parsing: false,
2003    },
2004    {
2005        name: enable_time_at_time_zone,
2006        desc: "use of AT TIME ZONE or timezone() with time type",
2007        default: false,
2008        enable_for_item_parsing: true,
2009    },
2010    {
2011        name: enable_load_generator_counter,
2012        desc: "Create a LOAD GENERATOR COUNTER",
2013        default: false,
2014        enable_for_item_parsing: true,
2015    },
2016    {
2017        name: enable_load_generator_clock,
2018        desc: "Create a LOAD GENERATOR CLOCK",
2019        default: false,
2020        enable_for_item_parsing: true,
2021    },
2022    {
2023        name: enable_load_generator_datums,
2024        desc: "Create a LOAD GENERATOR DATUMS",
2025        default: false,
2026        enable_for_item_parsing: true,
2027    },
2028    {
2029        name: enable_load_generator_key_value,
2030        desc: "Create a LOAD GENERATOR KEY VALUE",
2031        default: false,
2032        enable_for_item_parsing: true,
2033    },
2034    {
2035        name: enable_expressions_in_limit_syntax,
2036        desc: "LIMIT <expr> syntax",
2037        default: true,
2038        enable_for_item_parsing: true,
2039    },
2040    {
2041        name: enable_mz_notices,
2042        desc: "Populate the contents of `mz_internal.mz_notices`",
2043        default: true,
2044        enable_for_item_parsing: false,
2045    },
2046    {
2047        name: enable_eager_delta_joins,
2048        desc:
2049            "eager delta joins",
2050        default: false,
2051        enable_for_item_parsing: false,
2052    },
2053    {
2054        name: enable_off_thread_optimization,
2055        desc: "use off-thread optimization in `CREATE` statements",
2056        default: true,
2057        enable_for_item_parsing: false,
2058    },
2059    {
2060        name: enable_refresh_every_mvs,
2061        desc: "REFRESH EVERY and REFRESH AT materialized views",
2062        default: false,
2063        enable_for_item_parsing: true,
2064    },
2065    {
2066        name: enable_cluster_schedule_refresh,
2067        desc: "`SCHEDULE = ON REFRESH` cluster option",
2068        default: false,
2069        enable_for_item_parsing: true,
2070    },
2071    {
2072        name: enable_reduce_mfp_fusion,
2073        desc: "fusion of MFPs in reductions",
2074        default: true,
2075        enable_for_item_parsing: false,
2076    },
2077    {
2078        name: enable_worker_core_affinity,
2079        desc: "set core affinity for replica worker threads",
2080        default: false,
2081        enable_for_item_parsing: false,
2082    },
2083    {
2084        name: enable_storage_introspection_logs,
2085        desc: "forward storage timely logging events into compute's introspection dataflow",
2086        default: false,
2087        enable_for_item_parsing: false,
2088    },
2089    {
2090        name: enable_session_timelines,
2091        desc: "strong session serializable isolation levels",
2092        default: false,
2093        enable_for_item_parsing: false,
2094    },
2095    {
2096        name: enable_variadic_left_join_lowering,
2097        desc: "Enable joint HIR ⇒ MIR lowering of stacks of left joins",
2098        default: true,
2099        enable_for_item_parsing: false,
2100    },
2101    {
2102        name: enable_redacted_test_option,
2103        desc: "Enable useless option to test value redaction",
2104        default: false,
2105        enable_for_item_parsing: true,
2106    },
2107    {
2108        name: enable_letrec_fixpoint_analysis,
2109        desc: "Enable Lattice-based fixpoint iteration on LetRec nodes in the Analysis framework",
2110        default: true, // This is just a failsafe switch for the deployment of materialize#25591.
2111        enable_for_item_parsing: false,
2112    },
2113    {
2114        name: enable_kafka_sink_headers,
2115        desc: "Enable the HEADERS option for Kafka sinks",
2116        default: false,
2117        enable_for_item_parsing: true,
2118    },
2119    {
2120        name: enable_unlimited_retain_history,
2121        desc: "Disable limits on RETAIN HISTORY (below 1s default, and 0 disables compaction).",
2122        default: false,
2123        enable_for_item_parsing: true,
2124    },
2125    {
2126        name: enable_envelope_upsert_inline_errors,
2127        desc: "The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT",
2128        default: true,
2129        enable_for_item_parsing: true,
2130    },
2131    {
2132        name: enable_alter_table_add_column,
2133        desc: "Enable ALTER TABLE ... ADD COLUMN ...",
2134        default: false,
2135        enable_for_item_parsing: false,
2136    },
2137    {
2138        name: enable_zero_downtime_cluster_reconfiguration,
2139        desc: "Enable zero-downtime reconfiguration for alter cluster",
2140        default: false,
2141        enable_for_item_parsing: false,
2142    },
2143    {
2144        name: enable_network_policies,
2145        desc: "ENABLE NETWORK POLICIES",
2146        default: true,
2147        enable_for_item_parsing: true,
2148    },
2149    {
2150        name: enable_create_table_from_source,
2151        desc: "Whether to allow CREATE TABLE .. FROM SOURCE syntax.",
2152        default: true,
2153        enable_for_item_parsing: true,
2154    },
2155    {
2156        name: enable_join_prioritize_arranged,
2157        desc: "Whether join planning should prioritize already-arranged keys over keys with more fields.",
2158        default: false,
2159        enable_for_item_parsing: false,
2160    },
2161    {
2162        name: enable_projection_pushdown_after_relation_cse,
2163        desc: "Run ProjectionPushdown one more time after the last RelationCSE.",
2164        default: true,
2165        enable_for_item_parsing: false,
2166    },
2167    {
2168        name: enable_less_reduce_in_eqprop,
2169        desc: "Run MSE::reduce in EquivalencePropagation only if reduce_expr changed something.",
2170        default: true,
2171        enable_for_item_parsing: false,
2172    },
2173    {
2174        name: enable_dequadratic_eqprop_map,
2175        desc: "Skip the quadratic part of EquivalencePropagation's handling of Map.",
2176        default: true,
2177        enable_for_item_parsing: false,
2178    },
2179    {
2180        name: enable_eq_classes_withholding_errors,
2181        desc: "Use `EquivalenceClassesWithholdingErrors` instead of raw `EquivalenceClasses` during eq prop for joins.",
2182        default: true,
2183        enable_for_item_parsing: false,
2184    },
2185    {
2186        name: enable_fast_path_plan_insights,
2187        desc: "Enables those plan insight notices that help with getting fast path queries. Don't turn on before #9492 is fixed!",
2188        default: false,
2189        enable_for_item_parsing: false,
2190    },
2191    {
2192        name: enable_with_ordinality_legacy_fallback,
2193        desc: "When the new WITH ORDINALITY implementation can't be used with a table func, whether to fall back to the legacy implementation or error out.",
2194        default: false,
2195        enable_for_item_parsing: true,
2196    },
2197    {
2198        name: enable_frontend_peek_sequencing, // currently, changes only take effect for new sessions
2199        desc: "Enables the new peek sequencing code, which does most of its work in the Adapter Frontend instead of the Coordinator main task.",
2200        default: true,
2201        enable_for_item_parsing: false,
2202    },
2203    {
2204        name: enable_replacement_materialized_views,
2205        desc: "Whether to enable replacement materialized views.",
2206        default: true,
2207        enable_for_item_parsing: true,
2208    },
2209    {
2210        name: enable_cast_elimination,
2211        desc: "Allow the optimizer to eliminate noop casts between values of equivalent representation types.",
2212        default: true,
2213        enable_for_item_parsing: false,
2214    },
2215    {
2216        // Just an escape hatch for the unlikely case that we have some user who is doing such
2217        // queries. Can be removed after one week in prod.
2218        // https://github.com/MaterializeInc/database-issues/issues/10004
2219        name: disallow_unmaterializable_functions_as_of,
2220        desc: "Prohibits calling unmaterializable functions (except `mz_now`) in AS OF queries.",
2221        default: true,
2222        enable_for_item_parsing: false,
2223    },
2224    {
2225        name: enable_case_literal_transform,
2226        desc: "Allow the optimizer to rewrite If-chains matching a single expression against literals into a CaseLiteral lookup.",
2227        default: false,
2228        enable_for_item_parsing: false,
2229    },
2230    {
2231        name: enable_simplify_quantified_comparisons,
2232        desc: "Allow the optimizer to simplify quantified comparisons in JOIN ON clauses into semi/anti-join EXISTS form during HIR-to-MIR lowering.",
2233        default: true,
2234        enable_for_item_parsing: false,
2235    },
2236    {
2237        name: enable_coalesce_case_transform,
2238        desc: "Allow the optimizer to push `COALESCE` into `CASE WHEN`.",
2239        default: true,
2240        enable_for_item_parsing: false,
2241    },
2242);
2243
2244impl From<&super::SystemVars> for OptimizerFeatures {
2245    fn from(vars: &super::SystemVars) -> Self {
2246        Self {
2247            enable_consolidate_after_union_negate: vars.enable_consolidate_after_union_negate(),
2248            enable_eager_delta_joins: vars.enable_eager_delta_joins(),
2249            enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(),
2250            enable_reduce_mfp_fusion: vars.enable_reduce_mfp_fusion(),
2251            enable_variadic_left_join_lowering: vars.enable_variadic_left_join_lowering(),
2252            enable_letrec_fixpoint_analysis: vars.enable_letrec_fixpoint_analysis(),
2253            enable_cardinality_estimates: vars.enable_cardinality_estimates(),
2254            persist_fast_path_limit: vars.persist_fast_path_limit(),
2255            reoptimize_imported_views: false,
2256            enable_join_prioritize_arranged: vars.enable_join_prioritize_arranged(),
2257            enable_projection_pushdown_after_relation_cse: vars
2258                .enable_projection_pushdown_after_relation_cse(),
2259            enable_less_reduce_in_eqprop: vars.enable_less_reduce_in_eqprop(),
2260            enable_dequadratic_eqprop_map: vars.enable_dequadratic_eqprop_map(),
2261            enable_eq_classes_withholding_errors: vars.enable_eq_classes_withholding_errors(),
2262            enable_fast_path_plan_insights: vars.enable_fast_path_plan_insights(),
2263            enable_cast_elimination: vars.enable_cast_elimination(),
2264            enable_case_literal_transform: vars.enable_case_literal_transform(),
2265            enable_simplify_quantified_comparisons: vars.enable_simplify_quantified_comparisons(),
2266            enable_coalesce_case_transform: vars.enable_coalesce_case_transform(),
2267        }
2268    }
2269}
2270
2271#[cfg(test)]
2272mod tests {
2273    use super::*;
2274    use crate::session::vars::SystemVars;
2275
2276    /// Ensure that all vars used for optimizer features have `enable_for_item_parsing = false`.
2277    ///
2278    /// This is important to ensure that plan caching works as intended during item parsing. Cached
2279    /// plans include the optimizer features they were produced with, and if they don't match on
2280    /// lookup, that results in a cache miss.
2281    #[mz_ore::test]
2282    fn optimizer_features_no_enable_for_item_parsing() {
2283        // Construct a `SystemVars` where all optimizer features are `false`.
2284        //
2285        // We do this in a roundabout way, by first constructing all-false `OptimizerFeatures` and
2286        // then assigning them to their respective system vars, to ensure we don't forget to update
2287        // this test when new optimizer features are added.
2288        let false_features = OptimizerFeatures::default();
2289        let OptimizerFeatures {
2290            enable_eq_classes_withholding_errors,
2291            enable_consolidate_after_union_negate,
2292            enable_eager_delta_joins,
2293            enable_letrec_fixpoint_analysis,
2294            enable_new_outer_join_lowering,
2295            enable_reduce_mfp_fusion,
2296            enable_variadic_left_join_lowering,
2297            enable_cardinality_estimates,
2298            persist_fast_path_limit,
2299            reoptimize_imported_views,
2300            enable_join_prioritize_arranged,
2301            enable_projection_pushdown_after_relation_cse,
2302            enable_less_reduce_in_eqprop,
2303            enable_dequadratic_eqprop_map,
2304            enable_fast_path_plan_insights,
2305            enable_cast_elimination,
2306            enable_case_literal_transform,
2307            enable_simplify_quantified_comparisons,
2308            enable_coalesce_case_transform,
2309        } = false_features;
2310
2311        let mut vars = SystemVars::new();
2312
2313        macro_rules! set_var {
2314            ($var:ident) => {
2315                vars.set(stringify!($var), VarInput::Flat(&$var.to_string()))
2316                    .unwrap();
2317            };
2318        }
2319
2320        set_var!(enable_eq_classes_withholding_errors);
2321        set_var!(enable_consolidate_after_union_negate);
2322        set_var!(enable_eager_delta_joins);
2323        set_var!(enable_letrec_fixpoint_analysis);
2324        set_var!(enable_new_outer_join_lowering);
2325        set_var!(enable_reduce_mfp_fusion);
2326        set_var!(enable_variadic_left_join_lowering);
2327        set_var!(enable_cardinality_estimates);
2328        set_var!(persist_fast_path_limit);
2329        let _ = reoptimize_imported_views; // no corresponding var
2330        set_var!(enable_join_prioritize_arranged);
2331        set_var!(enable_projection_pushdown_after_relation_cse);
2332        set_var!(enable_less_reduce_in_eqprop);
2333        set_var!(enable_dequadratic_eqprop_map);
2334        set_var!(enable_fast_path_plan_insights);
2335        set_var!(enable_cast_elimination);
2336        set_var!(enable_case_literal_transform);
2337        set_var!(enable_simplify_quantified_comparisons);
2338        set_var!(enable_coalesce_case_transform);
2339
2340        // Enable for item parsing, then ensure we still get the same optimizer features.
2341        vars.enable_for_item_parsing();
2342        let features_for_item_parsing = OptimizerFeatures::from(&vars);
2343        assert_eq!(features_for_item_parsing, false_features);
2344    }
2345}