Skip to main content

mz_sql/session/vars/
definitions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::num::NonZeroU32;
12use std::str::FromStr;
13use std::sync::Arc;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::{DateTime, Utc};
18use derivative::Derivative;
19use mz_adapter_types::timestamp_oracle::{
20    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
21    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
22};
23use mz_ore::cast::{self, CastFrom};
24use mz_repr::adt::numeric::Numeric;
25use mz_repr::adt::timestamp::CheckedTimestamp;
26use mz_repr::bytes::ByteSize;
27use mz_repr::optimize::OptimizerFeatures;
28use mz_sql_parser::ast::Ident;
29use mz_sql_parser::ident;
30use mz_storage_types::parameters::REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT;
31use mz_storage_types::parameters::{
32    DEFAULT_PG_SOURCE_CONNECT_TIMEOUT, DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
33    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE, DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
34    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES, DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT,
35    DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT, STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
36};
37use mz_tracing::{CloneableEnvFilter, SerializableDirective};
38use uncased::UncasedStr;
39
40use crate::session::user::{SUPPORT_USER, SYSTEM_USER, User};
41use crate::session::vars::constraints::{
42    BYTESIZE_AT_LEAST_1MB, DomainConstraint, NUMERIC_BOUNDED_0_1_INCLUSIVE, NUMERIC_NON_NEGATIVE,
43    ValueConstraint,
44};
45use crate::session::vars::errors::VarError;
46use crate::session::vars::polyfill::{LazyValueFn, lazy_value, value};
47use crate::session::vars::value::{
48    ClientEncoding, ClientSeverity, DEFAULT_DATE_STYLE, Failpoints, IntervalStyle, IsolationLevel,
49    TimeZone, Value,
50};
51use crate::session::vars::{FeatureFlag, Var, VarInput, VarParseError};
52use crate::{DEFAULT_SCHEMA, WEBHOOK_CONCURRENCY_LIMIT};
53
54/// Definition of a variable.
55#[derive(Clone, Derivative)]
56#[derivative(Debug)]
57pub struct VarDefinition {
58    /// Name of the variable, case-insensitive matching.
59    pub name: &'static UncasedStr,
60    /// Description of the variable.
61    pub description: &'static str,
62    /// Is the variable visible to users, when false only visible to system users.
63    pub user_visible: bool,
64
65    /// Default compiled in value for this variable.
66    pub value: VarDefaultValue,
67    /// Constraint that must be upheld for this variable to be valid.
68    pub constraint: Option<ValueConstraint>,
69    /// When set, prevents getting or setting the variable unless the specified
70    /// feature flag is enabled.
71    pub require_feature_flag: Option<&'static FeatureFlag>,
72
73    /// Method to parse [`VarInput`] into a type that implements [`Value`].
74    ///
75    /// The reason `parse` exists as a function pointer is because we want to achieve two things:
76    ///   1. `VarDefinition` has no generic parameters.
77    ///   2. `Value::parse` returns an instance of `Self`.
78    /// `VarDefinition` holds a `dyn Value`, but `Value::parse` is not object safe because it
79    /// returns `Self`, so we can't call that method. We could change `Value::parse` to return a
80    /// `Box<dyn Value>` making it object safe, but that creates a footgun where it's possible for
81    /// `Value::parse` to return a type that isn't `Self`, e.g. `<String as Value>::parse` could
82    /// return a `usize`!
83    ///
84    /// So to prevent making `VarDefinition` generic over some type `V: Value`, but also defining
85    /// `Value::parse` as returning `Self`, we store a static function pointer to the `parse`
86    /// implementation of our default value.
87    #[derivative(Debug = "ignore")]
88    parse: fn(VarInput) -> Result<Box<dyn Value>, VarParseError>,
89    /// Returns a human readable name for the type of this variable. We store this as a static
90    /// function pointer for the same reason as `parse`.
91    #[derivative(Debug = "ignore")]
92    type_name: fn() -> Cow<'static, str>,
93}
94static_assertions::assert_impl_all!(VarDefinition: Send, Sync);
95
96impl VarDefinition {
97    /// Create a new [`VarDefinition`] in a const context with a value known at compile time.
98    pub const fn new<V: Value>(
99        name: &'static str,
100        value: &'static V,
101        description: &'static str,
102        user_visible: bool,
103    ) -> Self {
104        VarDefinition {
105            name: UncasedStr::new(name),
106            description,
107            value: VarDefaultValue::Static(value),
108            user_visible,
109            parse: V::parse_dyn_value,
110            type_name: V::type_name,
111            constraint: None,
112            require_feature_flag: None,
113        }
114    }
115
116    /// Create a new [`VarDefinition`] in a const context with a lazily evaluated value.
117    pub const fn new_lazy<V: Value, L: LazyValueFn<V>>(
118        name: &'static str,
119        _value: L,
120        description: &'static str,
121        user_visible: bool,
122    ) -> Self {
123        VarDefinition {
124            name: UncasedStr::new(name),
125            description,
126            value: VarDefaultValue::Lazy(L::LAZY_VALUE_FN),
127            user_visible,
128            parse: V::parse_dyn_value,
129            type_name: V::type_name,
130            constraint: None,
131            require_feature_flag: None,
132        }
133    }
134
135    /// Create a new [`VarDefinition`] with a value known at runtime.
136    pub fn new_runtime<V: Value>(
137        name: &'static str,
138        value: V,
139        description: &'static str,
140        user_visible: bool,
141    ) -> Self {
142        VarDefinition {
143            name: UncasedStr::new(name),
144            description,
145            value: VarDefaultValue::Runtime(Arc::new(value)),
146            user_visible,
147            parse: V::parse_dyn_value,
148            type_name: V::type_name,
149            constraint: None,
150            require_feature_flag: None,
151        }
152    }
153
154    /// TODO(parkmycar): Refactor this method onto a `VarDefinitionBuilder` that would allow us to
155    /// constrain `V` here to be the same `V` used in [`VarDefinition::new`].
156    pub const fn with_constraint<V: Value, D: DomainConstraint<Value = V>>(
157        mut self,
158        constraint: &'static D,
159    ) -> Self {
160        self.constraint = Some(ValueConstraint::Domain(constraint));
161        self
162    }
163
164    pub const fn fixed(mut self) -> Self {
165        self.constraint = Some(ValueConstraint::Fixed);
166        self
167    }
168
169    pub const fn read_only(mut self) -> Self {
170        self.constraint = Some(ValueConstraint::ReadOnly);
171        self
172    }
173
174    pub const fn with_feature_flag(mut self, feature_flag: &'static FeatureFlag) -> Self {
175        self.require_feature_flag = Some(feature_flag);
176        self
177    }
178
179    pub fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
180        (self.parse)(input).map_err(|err| err.into_var_error(self))
181    }
182
183    pub fn default_value(&self) -> &'_ dyn Value {
184        self.value.value()
185    }
186}
187
188impl Var for VarDefinition {
189    fn name(&self) -> &'static str {
190        self.name.as_str()
191    }
192
193    fn value(&self) -> String {
194        self.default_value().format()
195    }
196
197    fn description(&self) -> &'static str {
198        self.description
199    }
200
201    fn type_name(&self) -> Cow<'static, str> {
202        (self.type_name)()
203    }
204
205    fn visible(&self, user: &User, system_vars: &super::SystemVars) -> Result<(), VarError> {
206        if !self.user_visible && user != &*SYSTEM_USER && user != &*SUPPORT_USER {
207            Err(VarError::UnknownParameter(self.name().to_string()))
208        } else if self.is_unsafe() && !system_vars.allow_unsafe() {
209            Err(VarError::RequiresUnsafeMode(self.name()))
210        } else {
211            if let Some(flag) = self.require_feature_flag {
212                flag.require(system_vars)?;
213            }
214
215            Ok(())
216        }
217    }
218}
219
220/// The kinds of compiled in default values that can be used with [`VarDefinition`].
221#[derive(Clone, Debug)]
222pub enum VarDefaultValue {
223    /// Static that can be evaluated at compile time.
224    Static(&'static dyn Value),
225    /// Lazy value that is defined at compile time, but created at runtime.
226    Lazy(fn() -> &'static dyn Value),
227    /// Value created at runtime. Note: This is generally an escape hatch.
228    Runtime(Arc<dyn Value>),
229}
230
231impl VarDefaultValue {
232    pub fn value(&self) -> &'_ dyn Value {
233        match self {
234            VarDefaultValue::Static(s) => *s,
235            VarDefaultValue::Lazy(l) => (l)(),
236            VarDefaultValue::Runtime(r) => r.as_ref(),
237        }
238    }
239}
240
241// We pretend to be Postgres v9.5.0, which is also what CockroachDB pretends to
242// be. Too new and some clients will emit a "server too new" warning. Too old
243// and some clients will fall back to legacy code paths. v9.5.0 empirically
244// seems to be a good compromise.
245
246/// The major version of PostgreSQL that Materialize claims to be.
247pub const SERVER_MAJOR_VERSION: u8 = 9;
248
249/// The minor version of PostgreSQL that Materialize claims to be.
250pub const SERVER_MINOR_VERSION: u8 = 5;
251
252/// The patch version of PostgreSQL that Materialize claims to be.
253pub const SERVER_PATCH_VERSION: u8 = 0;
254
255/// The name of the default database that Materialize uses.
256pub const DEFAULT_DATABASE_NAME: &str = "materialize";
257
258pub static APPLICATION_NAME: VarDefinition = VarDefinition::new(
259    "application_name",
260    value!(String; String::new()),
261    "Sets the application name to be reported in statistics and logs (PostgreSQL).",
262    true,
263);
264
265pub static CLIENT_ENCODING: VarDefinition = VarDefinition::new(
266    "client_encoding",
267    value!(ClientEncoding; ClientEncoding::Utf8),
268    "Sets the client's character set encoding (PostgreSQL).",
269    true,
270);
271
272pub static CLIENT_MIN_MESSAGES: VarDefinition = VarDefinition::new(
273    "client_min_messages",
274    value!(ClientSeverity; ClientSeverity::Notice),
275    "Sets the message levels that are sent to the client (PostgreSQL).",
276    true,
277);
278
279pub static CLUSTER: VarDefinition = VarDefinition::new_lazy(
280    "cluster",
281    lazy_value!(String; || "quickstart".to_string()),
282    "Sets the current cluster (Materialize).",
283    true,
284);
285
286pub static CLUSTER_REPLICA: VarDefinition = VarDefinition::new(
287    "cluster_replica",
288    value!(Option<String>; None),
289    "Sets a target cluster replica for SELECT queries (Materialize).",
290    true,
291);
292
293pub static CURRENT_OBJECT_MISSING_WARNINGS: VarDefinition = VarDefinition::new(
294    "current_object_missing_warnings",
295    value!(bool; true),
296    "Whether to emit warnings when the current database, schema, or cluster is missing (Materialize).",
297    true,
298);
299
300pub static DATABASE: VarDefinition = VarDefinition::new_lazy(
301    "database",
302    lazy_value!(String; || DEFAULT_DATABASE_NAME.to_string()),
303    "Sets the current database (CockroachDB).",
304    true,
305);
306
307pub static DATE_STYLE: VarDefinition = VarDefinition::new(
308    // DateStyle has nonstandard capitalization for historical reasons.
309    "DateStyle",
310    &DEFAULT_DATE_STYLE,
311    "Sets the display format for date and time values (PostgreSQL).",
312    true,
313);
314
315pub static DEFAULT_CLUSTER_REPLICATION_FACTOR: VarDefinition = VarDefinition::new(
316    "default_cluster_replication_factor",
317    value!(u32; 1),
318    "Default cluster replication factor (Materialize).",
319    true,
320);
321
322pub static EXTRA_FLOAT_DIGITS: VarDefinition = VarDefinition::new(
323    "extra_float_digits",
324    value!(i32; 3),
325    "Adjusts the number of digits displayed for floating-point values (PostgreSQL).",
326    true,
327);
328
329pub static FAILPOINTS: VarDefinition = VarDefinition::new(
330    "failpoints",
331    value!(Failpoints; Failpoints),
332    "Allows failpoints to be dynamically activated.",
333    true,
334);
335
336pub static INTEGER_DATETIMES: VarDefinition = VarDefinition::new(
337    "integer_datetimes",
338    value!(bool; true),
339    "Reports whether the server uses 64-bit-integer dates and times (PostgreSQL).",
340    true,
341)
342.fixed();
343
344pub static INTERVAL_STYLE: VarDefinition = VarDefinition::new(
345    // IntervalStyle has nonstandard capitalization for historical reasons.
346    "IntervalStyle",
347    value!(IntervalStyle; IntervalStyle::Postgres),
348    "Sets the display format for interval values (PostgreSQL).",
349    true,
350);
351
352pub const MZ_VERSION_NAME: &UncasedStr = UncasedStr::new("mz_version");
353pub const IS_SUPERUSER_NAME: &UncasedStr = UncasedStr::new("is_superuser");
354
355// Schema can be used an alias for a search path with a single element.
356pub const SCHEMA_ALIAS: &UncasedStr = UncasedStr::new("schema");
357pub static SEARCH_PATH: VarDefinition = VarDefinition::new_lazy(
358    "search_path",
359    lazy_value!(Vec<Ident>; || vec![ident!(DEFAULT_SCHEMA)]),
360    "Sets the schema search order for names that are not schema-qualified (PostgreSQL).",
361    true,
362);
363
364pub static STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
365    "statement_timeout",
366    value!(Duration; Duration::from_secs(60)),
367    "Sets the maximum allowed duration of INSERT...SELECT, UPDATE, and DELETE operations. \
368    If this value is specified without units, it is taken as milliseconds.",
369    true,
370);
371
372pub static IDLE_IN_TRANSACTION_SESSION_TIMEOUT: VarDefinition = VarDefinition::new(
373    "idle_in_transaction_session_timeout",
374    value!(Duration; Duration::from_secs(60 * 2)),
375    "Sets the maximum allowed duration that a session can sit idle in a transaction before \
376    being terminated. If this value is specified without units, it is taken as milliseconds. \
377    A value of zero disables the timeout (PostgreSQL).",
378    true,
379);
380
381pub static SERVER_VERSION: VarDefinition = VarDefinition::new_lazy(
382    "server_version",
383    lazy_value!(String; || {
384        format!("{SERVER_MAJOR_VERSION}.{SERVER_MINOR_VERSION}.{SERVER_PATCH_VERSION}")
385    }),
386    "Shows the PostgreSQL compatible server version (PostgreSQL).",
387    true,
388)
389.read_only();
390
391pub static SERVER_VERSION_NUM: VarDefinition = VarDefinition::new(
392    "server_version_num",
393    value!(i32; (cast::u8_to_i32(SERVER_MAJOR_VERSION) * 10_000)
394        + (cast::u8_to_i32(SERVER_MINOR_VERSION) * 100)
395        + cast::u8_to_i32(SERVER_PATCH_VERSION)),
396    "Shows the PostgreSQL compatible server version as an integer (PostgreSQL).",
397    true,
398)
399.read_only();
400
401pub static SQL_SAFE_UPDATES: VarDefinition = VarDefinition::new(
402    "sql_safe_updates",
403    value!(bool; false),
404    "Prohibits SQL statements that may be overly destructive (CockroachDB).",
405    true,
406);
407
408pub static STANDARD_CONFORMING_STRINGS: VarDefinition = VarDefinition::new(
409    "standard_conforming_strings",
410    value!(bool; true),
411    "Causes '...' strings to treat backslashes literally (PostgreSQL).",
412    true,
413)
414.fixed();
415
416pub static TIMEZONE: VarDefinition = VarDefinition::new(
417    // TimeZone has nonstandard capitalization for historical reasons.
418    "TimeZone",
419    value!(TimeZone; TimeZone::UTC),
420    "Sets the time zone for displaying and interpreting time stamps (PostgreSQL).",
421    true,
422);
423
424pub const TRANSACTION_ISOLATION_VAR_NAME: &str = "transaction_isolation";
425pub static TRANSACTION_ISOLATION: VarDefinition = VarDefinition::new(
426    TRANSACTION_ISOLATION_VAR_NAME,
427    value!(IsolationLevel; IsolationLevel::StrictSerializable),
428    "Sets the current transaction's isolation level (PostgreSQL).",
429    true,
430);
431
432pub static MAX_KAFKA_CONNECTIONS: VarDefinition = VarDefinition::new(
433    "max_kafka_connections",
434    value!(u32; 1000),
435    "The maximum number of Kafka connections in the region, across all schemas (Materialize).",
436    true,
437);
438
439pub static MAX_POSTGRES_CONNECTIONS: VarDefinition = VarDefinition::new(
440    "max_postgres_connections",
441    value!(u32; 1000),
442    "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize).",
443    true,
444);
445
446pub static MAX_MYSQL_CONNECTIONS: VarDefinition = VarDefinition::new(
447    "max_mysql_connections",
448    value!(u32; 1000),
449    "The maximum number of MySQL connections in the region, across all schemas (Materialize).",
450    true,
451);
452
453pub static MAX_SQL_SERVER_CONNECTIONS: VarDefinition = VarDefinition::new(
454    "max_sql_server_connections",
455    value!(u32; 1000),
456    "The maximum number of SQL Server connections in the region, across all schemas (Materialize).",
457    true,
458);
459
460pub static MAX_AWS_PRIVATELINK_CONNECTIONS: VarDefinition = VarDefinition::new(
461    "max_aws_privatelink_connections",
462    value!(u32; 0),
463    "The maximum number of AWS PrivateLink connections in the region, across all schemas (Materialize).",
464    true,
465);
466
467pub static MAX_TABLES: VarDefinition = VarDefinition::new(
468    "max_tables",
469    value!(u32; 200),
470    "The maximum number of tables in the region, across all schemas (Materialize).",
471    true,
472);
473
474pub static MAX_SOURCES: VarDefinition = VarDefinition::new(
475    "max_sources",
476    value!(u32; 200),
477    "The maximum number of sources in the region, across all schemas (Materialize).",
478    true,
479);
480
481pub static MAX_SINKS: VarDefinition = VarDefinition::new(
482    "max_sinks",
483    value!(u32; 1000),
484    "The maximum number of sinks in the region, across all schemas (Materialize).",
485    true,
486);
487
488pub static MAX_MATERIALIZED_VIEWS: VarDefinition = VarDefinition::new(
489    "max_materialized_views",
490    value!(u32; 500),
491    "The maximum number of materialized views in the region, across all schemas (Materialize).",
492    true,
493);
494
495pub static MAX_CLUSTERS: VarDefinition = VarDefinition::new(
496    "max_clusters",
497    value!(u32; 25),
498    "The maximum number of clusters in the region (Materialize).",
499    true,
500);
501
502pub static MAX_REPLICAS_PER_CLUSTER: VarDefinition = VarDefinition::new(
503    "max_replicas_per_cluster",
504    value!(u32; 5),
505    "The maximum number of replicas of a single cluster (Materialize).",
506    true,
507);
508
509pub static MAX_CREDIT_CONSUMPTION_RATE: VarDefinition = VarDefinition::new_lazy(
510    "max_credit_consumption_rate",
511    lazy_value!(Numeric; || 1024.into()),
512    "The maximum rate of credit consumption in a region. Credits are consumed based on the size of cluster replicas in use (Materialize).",
513    true,
514)
515.with_constraint(&NUMERIC_NON_NEGATIVE);
516
517pub static MAX_DATABASES: VarDefinition = VarDefinition::new(
518    "max_databases",
519    value!(u32; 1000),
520    "The maximum number of databases in the region (Materialize).",
521    true,
522);
523
524pub static MAX_SCHEMAS_PER_DATABASE: VarDefinition = VarDefinition::new(
525    "max_schemas_per_database",
526    value!(u32; 1000),
527    "The maximum number of schemas in a database (Materialize).",
528    true,
529);
530
531pub static MAX_OBJECTS_PER_SCHEMA: VarDefinition = VarDefinition::new(
532    "max_objects_per_schema",
533    value!(u32; 1000),
534    "The maximum number of objects in a schema (Materialize).",
535    true,
536);
537
538pub static MAX_SECRETS: VarDefinition = VarDefinition::new(
539    "max_secrets",
540    value!(u32; 100),
541    "The maximum number of secrets in the region, across all schemas (Materialize).",
542    true,
543);
544
545pub static MAX_ROLES: VarDefinition = VarDefinition::new(
546    "max_roles",
547    value!(u32; 1000),
548    "The maximum number of roles in the region (Materialize).",
549    true,
550);
551
552pub static MAX_NETWORK_POLICIES: VarDefinition = VarDefinition::new(
553    "max_network_policies",
554    value!(u32; 25),
555    "The maximum number of network policies in the region.",
556    true,
557);
558
559pub static MAX_RULES_PER_NETWORK_POLICY: VarDefinition = VarDefinition::new(
560    "max_rules_per_network_policy",
561    value!(u32; 25),
562    "The maximum number of rules per network policies.",
563    true,
564);
565
566// Cloud environmentd is configured with 4 GiB of RAM, so 1 GiB is a good heuristic for a single
567// query.
568//
569// We constrain this parameter to a minimum of 1MB, to avoid accidental usage of values that will
570// interfere with queries executed by the system itself.
571//
572// TODO(jkosh44) Eventually we want to be able to return arbitrary sized results.
573pub static MAX_RESULT_SIZE: VarDefinition = VarDefinition::new(
574    "max_result_size",
575    value!(ByteSize; ByteSize::gb(1)),
576    "The maximum size in bytes for an internal query result (Materialize).",
577    true,
578)
579.with_constraint(&BYTESIZE_AT_LEAST_1MB);
580
581pub static MAX_QUERY_RESULT_SIZE: VarDefinition = VarDefinition::new(
582    "max_query_result_size",
583    value!(ByteSize; ByteSize::gb(1)),
584    "The maximum size in bytes for a single query's result (Materialize).",
585    true,
586);
587
588pub static MAX_COPY_FROM_ROW_SIZE: VarDefinition = VarDefinition::new(
589    "max_copy_from_row_size",
590    value!(ByteSize; ByteSize::mb(128)),
591    "The maximum size in bytes for a single COPY FROM STDIN row (Materialize).",
592    true,
593);
594
595pub static MAX_IDENTIFIER_LENGTH: VarDefinition = VarDefinition::new(
596    "max_identifier_length",
597    value!(usize; mz_sql_lexer::lexer::MAX_IDENTIFIER_LENGTH),
598    "The maximum length of object identifiers in bytes (PostgreSQL).",
599    true,
600);
601
602pub static WELCOME_MESSAGE: VarDefinition = VarDefinition::new(
603    "welcome_message",
604    value!(bool; true),
605    "Whether to send a notice with a welcome message after a successful connection (Materialize).",
606    true,
607);
608
609/// The logical compaction window for builtin tables and sources that have the
610/// `retained_metrics_relation` flag set.
611///
612/// The existence of this variable is a bit of a hack until we have a fully
613/// general solution for controlling retention windows.
614pub static METRICS_RETENTION: VarDefinition = VarDefinition::new(
615    "metrics_retention",
616    // 30 days
617    value!(Duration; Duration::from_secs(30 * 24 * 60 * 60)),
618    "The time to retain cluster utilization metrics (Materialize).",
619    false,
620);
621
622pub static ALLOWED_CLUSTER_REPLICA_SIZES: VarDefinition = VarDefinition::new(
623    "allowed_cluster_replica_sizes",
624    value!(Vec<Ident>; Vec::new()),
625    "The allowed sizes when creating a new cluster replica (Materialize).",
626    true,
627);
628
629pub static PERSIST_FAST_PATH_LIMIT: VarDefinition = VarDefinition::new(
630    "persist_fast_path_limit",
631    value!(usize; 25),
632    "An exclusive upper bound on the number of results we may return from a Persist fast-path peek; \
633    queries that may return more results will follow the normal / slow path. \
634    Setting this to 0 disables the feature.",
635    false,
636);
637
638/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_size`.
639pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE: VarDefinition = VarDefinition::new(
640    "pg_timestamp_oracle_connection_pool_max_size",
641    value!(usize; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE),
642    "Maximum size of the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
643    false,
644);
645
646/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_wait`.
647pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT: VarDefinition = VarDefinition::new(
648    "pg_timestamp_oracle_connection_pool_max_wait",
649    value!(Option<Duration>; Some(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT)),
650    "The maximum time to wait when attempting to obtain a connection from the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
651    false,
652);
653
654/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl`.
655pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL: VarDefinition = VarDefinition::new(
656    "pg_timestamp_oracle_connection_pool_ttl",
657    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
658    "The minimum TTL of a Consensus connection to Postgres/CRDB before it is proactively terminated",
659    false,
660);
661
662/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl_stagger`.
663pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER: VarDefinition = VarDefinition::new(
664    "pg_timestamp_oracle_connection_pool_ttl_stagger",
665    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER),
666    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
667    false,
668);
669
670pub static UNSAFE_NEW_TRANSACTION_WALL_TIME: VarDefinition = VarDefinition::new(
671    "unsafe_new_transaction_wall_time",
672    value!(Option<CheckedTimestamp<DateTime<Utc>>>; None),
673    "Sets the wall time for all new explicit or implicit transactions to control the value of `now()`. \
674    If not set, uses the system's clock.",
675    // This needs to be true because `user_visible: false` things are only modifiable by the mz_system
676    // and mz_support users, and we want sqllogictest to have access with its user. Because the name
677    // starts with "unsafe" it still won't be visible or changeable by users unless unsafe mode is
678    // enabled.
679    true,
680);
681
682pub static SCRAM_ITERATIONS: VarDefinition = VarDefinition::new(
683    "scram_iterations",
684    // / The default iteration count as suggested by
685    // / <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
686    value!(NonZeroU32; NonZeroU32::new(600_000).unwrap()),
687    "Iterations to use when hashing passwords. Higher iterations are more secure, but take longer to validated. \
688    Please consider the security risks before reducing this below the default value.",
689    true,
690);
691
692/// Tuning for RocksDB used by `UPSERT` sources that takes effect on restart.
693pub mod upsert_rocksdb {
694    use super::*;
695    use mz_rocksdb_types::config::{CompactionStyle, CompressionType};
696
697    pub static UPSERT_ROCKSDB_COMPACTION_STYLE: VarDefinition = VarDefinition::new(
698        "upsert_rocksdb_compaction_style",
699        value!(CompactionStyle; mz_rocksdb_types::defaults::DEFAULT_COMPACTION_STYLE),
700        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
701        sources. Described in the `mz_rocksdb_types::config` module. \
702        Only takes effect on source restart (Materialize).",
703        false,
704    );
705
706    pub static UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET: VarDefinition =
707        VarDefinition::new(
708            "upsert_rocksdb_optimize_compaction_memtable_budget",
709            value!(usize; mz_rocksdb_types::defaults::DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET),
710            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
711        sources. Described in the `mz_rocksdb_types::config` module. \
712        Only takes effect on source restart (Materialize).",
713            false,
714        );
715
716    pub static UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES: VarDefinition =
717        VarDefinition::new(
718            "upsert_rocksdb_level_compaction_dynamic_level_bytes",
719            value!(bool; mz_rocksdb_types::defaults::DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES),
720            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
721        sources. Described in the `mz_rocksdb_types::config` module. \
722        Only takes effect on source restart (Materialize).",
723            false,
724        );
725
726    pub static UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO: VarDefinition = VarDefinition::new(
727        "upsert_rocksdb_universal_compaction_ratio",
728        value!(i32; mz_rocksdb_types::defaults::DEFAULT_UNIVERSAL_COMPACTION_RATIO),
729        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
730        sources. Described in the `mz_rocksdb_types::config` module. \
731        Only takes effect on source restart (Materialize).",
732        false,
733    );
734
735    pub static UPSERT_ROCKSDB_PARALLELISM: VarDefinition = VarDefinition::new(
736        "upsert_rocksdb_parallelism",
737        value!(Option<i32>; mz_rocksdb_types::defaults::DEFAULT_PARALLELISM),
738        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
739        sources. Described in the `mz_rocksdb_types::config` module. \
740        Only takes effect on source restart (Materialize).",
741        false,
742    );
743
744    pub static UPSERT_ROCKSDB_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
745        "upsert_rocksdb_compression_type",
746        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_COMPRESSION_TYPE),
747        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
748        sources. Described in the `mz_rocksdb_types::config` module. \
749        Only takes effect on source restart (Materialize).",
750        false,
751    );
752
753    pub static UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
754        "upsert_rocksdb_bottommost_compression_type",
755        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_BOTTOMMOST_COMPRESSION_TYPE),
756        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
757        sources. Described in the `mz_rocksdb_types::config` module. \
758        Only takes effect on source restart (Materialize).",
759        false,
760    );
761
762    pub static UPSERT_ROCKSDB_BATCH_SIZE: VarDefinition = VarDefinition::new(
763        "upsert_rocksdb_batch_size",
764        value!(usize; mz_rocksdb_types::defaults::DEFAULT_BATCH_SIZE),
765        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
766        sources. Described in the `mz_rocksdb_types::config` module. \
767        Can be changed dynamically (Materialize).",
768        false,
769    );
770
771    pub static UPSERT_ROCKSDB_RETRY_DURATION: VarDefinition = VarDefinition::new(
772        "upsert_rocksdb_retry_duration",
773        value!(Duration; mz_rocksdb_types::defaults::DEFAULT_RETRY_DURATION),
774        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
775        sources. Described in the `mz_rocksdb_types::config` module. \
776        Only takes effect on source restart (Materialize).",
777        false,
778    );
779
780    pub static UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
781        "upsert_rocksdb_stats_log_interval_seconds",
782        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_LOG_INTERVAL_S),
783        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
784        sources. Described in the `mz_rocksdb_types::config` module. \
785        Only takes effect on source restart (Materialize).",
786        false,
787    );
788
789    pub static UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
790        "upsert_rocksdb_stats_persist_interval_seconds",
791        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_PERSIST_INTERVAL_S),
792        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
793        sources. Described in the `mz_rocksdb_types::config` module. \
794        Only takes effect on source restart (Materialize).",
795        false,
796    );
797
798    pub static UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB: VarDefinition = VarDefinition::new(
799        "upsert_rocksdb_point_lookup_block_cache_size_mb",
800        value!(Option<u32>; None),
801        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
802        sources. Described in the `mz_rocksdb_types::config` module. \
803        Only takes effect on source restart (Materialize).",
804        false,
805    );
806
807    /// The number of times by which allocated buffers will be shrinked in upsert rocksdb.
808    /// If value is 0, then no shrinking will occur.
809    pub static UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
810        "upsert_rocksdb_shrink_allocated_buffers_by_ratio",
811        value!(usize; mz_rocksdb_types::defaults::DEFAULT_SHRINK_BUFFERS_BY_RATIO),
812        "The number of times by which allocated buffers will be shrinked in upsert rocksdb.",
813        false,
814    );
815
816    /// Only used if `upsert_rocksdb_write_buffer_manager_memory_bytes` is also set
817    /// and write buffer manager is enabled
818    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION: VarDefinition =
819        VarDefinition::new(
820            "upsert_rocksdb_write_buffer_manager_cluster_memory_fraction",
821            value!(Option<Numeric>; None),
822            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
823        sources. Described in the `mz_rocksdb_types::config` module. \
824        Only takes effect on source restart (Materialize).",
825            false,
826        );
827
828    /// `upsert_rocksdb_write_buffer_manager_memory_bytes` needs to be set for write buffer manager to be
829    /// used.
830    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES: VarDefinition = VarDefinition::new(
831        "upsert_rocksdb_write_buffer_manager_memory_bytes",
832        value!(Option<usize>; None),
833        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
834        sources. Described in the `mz_rocksdb_types::config` module. \
835        Only takes effect on source restart (Materialize).",
836        false,
837    );
838
839    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL: VarDefinition = VarDefinition::new(
840        "upsert_rocksdb_write_buffer_manager_allow_stall",
841        value!(bool; false),
842        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
843        sources. Described in the `mz_rocksdb_types::config` module. \
844        Only takes effect on source restart (Materialize).",
845        false,
846    );
847}
848
849pub static LOGGING_FILTER: VarDefinition = VarDefinition::new_lazy(
850    "log_filter",
851    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
852    "Sets the filter to apply to stderr logging.",
853    false,
854);
855
856pub static OPENTELEMETRY_FILTER: VarDefinition = VarDefinition::new_lazy(
857    "opentelemetry_filter",
858    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
859    "Sets the filter to apply to OpenTelemetry-backed distributed tracing.",
860    false,
861);
862
863pub static LOGGING_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
864    "log_filter_defaults",
865    lazy_value!(Vec<SerializableDirective>; || {
866        mz_ore::tracing::LOGGING_DEFAULTS
867            .iter()
868            .map(|d| d.clone().into())
869            .collect()
870    }),
871    "Sets additional default directives to apply to stderr logging. \
872        These apply to all variations of `log_filter`. Directives other than \
873        `module=off` are likely incorrect.",
874    false,
875);
876
877pub static OPENTELEMETRY_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
878    "opentelemetry_filter_defaults",
879    lazy_value!(Vec<SerializableDirective>; || {
880        mz_ore::tracing::OPENTELEMETRY_DEFAULTS
881            .iter()
882            .map(|d| d.clone().into())
883            .collect()
884    }),
885    "Sets additional default directives to apply to OpenTelemetry-backed \
886        distributed tracing. \
887        These apply to all variations of `opentelemetry_filter`. Directives other than \
888        `module=off` are likely incorrect.",
889    false,
890);
891
892pub static SENTRY_FILTERS: VarDefinition = VarDefinition::new_lazy(
893    "sentry_filters",
894    lazy_value!(Vec<SerializableDirective>; || {
895        mz_ore::tracing::SENTRY_DEFAULTS
896            .iter()
897            .map(|d| d.clone().into())
898            .collect()
899    }),
900    "Sets additional default directives to apply to sentry logging. \
901        These apply on top of a default `info` directive. Directives other than \
902        `module=off` are likely incorrect.",
903    false,
904);
905
906pub static WEBHOOKS_SECRETS_CACHING_TTL_SECS: VarDefinition = VarDefinition::new_lazy(
907    "webhooks_secrets_caching_ttl_secs",
908    lazy_value!(usize; || {
909        usize::cast_from(mz_secrets::cache::DEFAULT_TTL_SECS)
910    }),
911    "Sets the time-to-live for values in the Webhooks secrets cache.",
912    false,
913);
914
915pub static COORD_SLOW_MESSAGE_WARN_THRESHOLD: VarDefinition = VarDefinition::new(
916    "coord_slow_message_warn_threshold",
917    value!(Duration; Duration::from_secs(30)),
918    "Sets the threshold at which we will error! for a coordinator message being slow.",
919    false,
920);
921
922/// Controls the connect_timeout setting when connecting to PG via `mz_postgres_util`.
923pub static PG_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
924    "pg_source_connect_timeout",
925    value!(Duration; DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
926    "Sets the timeout applied to socket-level connection attempts for PG \
927    replication connections (Materialize).",
928    false,
929);
930
931/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection
932/// when connecting to PG via `mz_postgres_util`.
933pub static PG_SOURCE_TCP_KEEPALIVES_RETRIES: VarDefinition = VarDefinition::new(
934    "pg_source_tcp_keepalives_retries",
935    value!(u32; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
936    "Sets the maximum number of TCP keepalive probes that will be sent before dropping \
937    a connection when connecting to PG via `mz_postgres_util` (Materialize).",
938    false,
939);
940
941/// Sets the amount of idle time before a keepalive packet is sent on the connection when connecting
942/// to PG via `mz_postgres_util`.
943pub static PG_SOURCE_TCP_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
944    "pg_source_tcp_keepalives_idle",
945    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
946    "Sets the amount of idle time before a keepalive packet is sent on the connection \
947        when connecting to PG via `mz_postgres_util` (Materialize).",
948    false,
949);
950
951/// Sets the time interval between TCP keepalive probes when connecting to PG via `mz_postgres_util`.
952pub static PG_SOURCE_TCP_KEEPALIVES_INTERVAL: VarDefinition = VarDefinition::new(
953    "pg_source_tcp_keepalives_interval",
954    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
955    "Sets the time interval between TCP keepalive probes when connecting to PG via \
956        replication (Materialize).",
957    false,
958);
959
960/// Sets the TCP user timeout when connecting to PG via `mz_postgres_util`.
961pub static PG_SOURCE_TCP_USER_TIMEOUT: VarDefinition = VarDefinition::new(
962    "pg_source_tcp_user_timeout",
963    value!(Duration; DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
964    "Sets the TCP user timeout when connecting to PG via `mz_postgres_util` (Materialize).",
965    false,
966);
967
968/// Sets whether to apply the TCP configuration parameters on the server when
969/// connecting to PG via `mz_postgres_util`.
970pub static PG_SOURCE_TCP_CONFIGURE_SERVER: VarDefinition = VarDefinition::new(
971    "pg_source_tcp_configure_server",
972    value!(bool; DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER),
973    "Sets whether to apply the TCP configuration parameters on the server when connecting to PG via `mz_postgres_util` (Materialize).",
974    false,
975);
976
977/// Sets the `statement_timeout` value to use during the snapshotting phase of
978/// PG sources.
979pub static PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
980    "pg_source_snapshot_statement_timeout",
981    value!(Duration; mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT),
982    "Sets the `statement_timeout` value to use during the snapshotting phase of PG sources (Materialize)",
983    false,
984);
985
986/// Sets the `wal_sender_timeout` value to use during the replication phase of
987/// PG sources.
988pub static PG_SOURCE_WAL_SENDER_TIMEOUT: VarDefinition = VarDefinition::new(
989    "pg_source_wal_sender_timeout",
990    value!(Option<Duration>; DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT),
991    "Sets the `wal_sender_timeout` value to use during the replication phase of PG sources (Materialize)",
992    false,
993);
994
995/// Please see `PgSourceSnapshotConfig`.
996pub static PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT: VarDefinition = VarDefinition::new(
997    "pg_source_snapshot_collect_strict_count",
998    value!(bool; mz_storage_types::parameters::PgSourceSnapshotConfig::new().collect_strict_count),
999    "Please see <https://dev.materialize.com/api/rust-private\
1000        /mz_storage_types/parameters\
1001        /struct.PgSourceSnapshotConfig.html#structfield.collect_strict_count>",
1002    false,
1003);
1004
1005/// Sets the time between TCP keepalive probes when connecting to MySQL via `mz_mysql_util`.
1006pub static MYSQL_SOURCE_TCP_KEEPALIVE: VarDefinition = VarDefinition::new(
1007    "mysql_source_tcp_keepalive",
1008    value!(Duration; mz_mysql_util::DEFAULT_TCP_KEEPALIVE),
1009    "Sets the time between TCP keepalive probes when connecting to MySQL",
1010    false,
1011);
1012
1013/// Sets the `max_execution_time` value to use during the snapshotting phase of
1014/// MySQL sources.
1015pub static MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME: VarDefinition = VarDefinition::new(
1016    "mysql_source_snapshot_max_execution_time",
1017    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_MAX_EXECUTION_TIME),
1018    "Sets the `max_execution_time` value to use during the snapshotting phase of MySQL sources (Materialize)",
1019    false,
1020);
1021
1022/// Sets the `lock_wait_timeout` value to use during the snapshotting phase of
1023/// MySQL sources.
1024pub static MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT: VarDefinition = VarDefinition::new(
1025    "mysql_source_snapshot_lock_wait_timeout",
1026    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_LOCK_WAIT_TIMEOUT),
1027    "Sets the `lock_wait_timeout` value to use during the snapshotting phase of MySQL sources (Materialize)",
1028    false,
1029);
1030
1031/// Sets the timeout for establishing an authenticated connection to MySQL
1032pub static MYSQL_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1033    "mysql_source_connect_timeout",
1034    value!(Duration; mz_mysql_util::DEFAULT_CONNECT_TIMEOUT),
1035    "Sets the timeout for establishing an authenticated connection to MySQL",
1036    false,
1037);
1038
1039/// Controls the check interval for connections to SSH bastions via `mz_ssh_util`.
1040pub static SSH_CHECK_INTERVAL: VarDefinition = VarDefinition::new(
1041    "ssh_check_interval",
1042    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CHECK_INTERVAL),
1043    "Controls the check interval for connections to SSH bastions via `mz_ssh_util`.",
1044    false,
1045);
1046
1047/// Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.
1048pub static SSH_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1049    "ssh_connect_timeout",
1050    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CONNECT_TIMEOUT),
1051    "Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.",
1052    false,
1053);
1054
1055/// Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.
1056pub static SSH_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
1057    "ssh_keepalives_idle",
1058    value!(Duration; mz_ssh_util::tunnel::DEFAULT_KEEPALIVES_IDLE),
1059    "Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.",
1060    false,
1061);
1062
1063/// Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.
1064pub static KAFKA_SOCKET_KEEPALIVE: VarDefinition = VarDefinition::new(
1065    "kafka_socket_keepalive",
1066    value!(bool; mz_kafka_util::client::DEFAULT_KEEPALIVE),
1067    "Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.",
1068    false,
1069);
1070
1071/// Controls `socket.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1072/// (60000ms). Cannot be greater than 300000ms, more than 100ms greater than
1073/// `kafka_transaction_timeout`, or less than 10ms.
1074pub static KAFKA_SOCKET_TIMEOUT: VarDefinition = VarDefinition::new(
1075    "kafka_socket_timeout",
1076    value!(Option<Duration>; None),
1077    "Controls `socket.timeout.ms` for rdkafka \
1078        client connections. Defaults to the rdkafka default (60000ms) or \
1079        the set transaction timeout + 100ms, whichever one is smaller. \
1080        Cannot be greater than 300000ms, more than 100ms greater than \
1081        `kafka_transaction_timeout`, or less than 10ms.",
1082    false,
1083);
1084
1085/// Controls `transaction.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1086/// (60000ms). Cannot be greater than `i32::MAX` or less than 1000ms.
1087pub static KAFKA_TRANSACTION_TIMEOUT: VarDefinition = VarDefinition::new(
1088    "kafka_transaction_timeout",
1089    value!(Duration; mz_kafka_util::client::DEFAULT_TRANSACTION_TIMEOUT),
1090    "Controls `transaction.timeout.ms` for rdkafka \
1091        client connections. Defaults to the 10min. \
1092        Cannot be greater than `i32::MAX` or less than 1000ms.",
1093    false,
1094);
1095
1096/// Controls `socket.connection.setup.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1097/// (30000ms). Cannot be greater than `i32::MAX` or less than 1000ms
1098pub static KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT: VarDefinition = VarDefinition::new(
1099    "kafka_socket_connection_setup_timeout",
1100    value!(Duration; mz_kafka_util::client::DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT),
1101    "Controls `socket.connection.setup.timeout.ms` for rdkafka \
1102        client connections. Defaults to the rdkafka default (30000ms). \
1103        Cannot be greater than `i32::MAX` or less than 1000ms",
1104    false,
1105);
1106
1107/// Controls the timeout when fetching kafka metadata. Defaults to 10s.
1108pub static KAFKA_FETCH_METADATA_TIMEOUT: VarDefinition = VarDefinition::new(
1109    "kafka_fetch_metadata_timeout",
1110    value!(Duration; mz_kafka_util::client::DEFAULT_FETCH_METADATA_TIMEOUT),
1111    "Controls the timeout when fetching kafka metadata. \
1112        Defaults to 10s.",
1113    false,
1114);
1115
1116/// Controls the timeout when fetching kafka progress records. Defaults to 60s.
1117pub static KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT: VarDefinition = VarDefinition::new(
1118    "kafka_progress_record_fetch_timeout",
1119    value!(Option<Duration>; None),
1120    "Controls the timeout when fetching kafka progress records. \
1121        Defaults to 60s or the transaction timeout, whichever one is larger.",
1122    false,
1123);
1124
1125/// The maximum number of in-flight bytes emitted by persist_sources feeding _storage
1126/// dataflows_.
1127/// Currently defaults to 256MiB = 268435456 bytes
1128/// Note: Backpressure will only be turned on if disk is enabled based on
1129/// `storage_dataflow_max_inflight_bytes_disk_only` flag
1130pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES: VarDefinition = VarDefinition::new(
1131    "storage_dataflow_max_inflight_bytes",
1132    value!(Option<usize>; Some(256 * 1024 * 1024)),
1133    "The maximum number of in-flight bytes emitted by persist_sources feeding \
1134        storage dataflows. Defaults to backpressure enabled (Materialize).",
1135    false,
1136);
1137
1138/// Configuration ratio to shrink unusef buffers in upsert by.
1139/// For eg: is 2 is set, then the buffers will be reduced by 2 i.e. halved.
1140/// Default is 0, which means shrinking is disabled.
1141pub static STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
1142    "storage_shrink_upsert_unused_buffers_by_ratio",
1143    value!(usize; 0),
1144    "Configuration ratio to shrink unusef buffers in upsert by",
1145    false,
1146);
1147
1148/// The fraction of the cluster replica size to be used as the maximum number of
1149/// in-flight bytes emitted by persist_sources feeding storage dataflows.
1150/// If not configured, the storage_dataflow_max_inflight_bytes value will be used.
1151/// For this value to be used storage_dataflow_max_inflight_bytes needs to be set.
1152pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION: VarDefinition =
1153    VarDefinition::new_lazy(
1154        "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction",
1155        lazy_value!(Option<Numeric>; || Some(0.01.into())),
1156        "The fraction of the cluster replica size to be used as the maximum number of \
1157            in-flight bytes emitted by persist_sources feeding storage dataflows. \
1158            If not configured, the storage_dataflow_max_inflight_bytes value will be used.",
1159        false,
1160    );
1161
1162pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY: VarDefinition = VarDefinition::new(
1163    "storage_dataflow_max_inflight_bytes_disk_only",
1164    value!(bool; true),
1165    "Whether or not `storage_dataflow_max_inflight_bytes` applies only to \
1166        upsert dataflows using disks. Defaults to true (Materialize).",
1167    false,
1168);
1169
1170/// The interval to submit statistics to `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker`.
1171pub static STORAGE_STATISTICS_INTERVAL: VarDefinition = VarDefinition::new(
1172    "storage_statistics_interval",
1173    value!(Duration; mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT),
1174    "The interval to submit statistics to `mz_source_statistics_per_worker` \
1175        and `mz_sink_statistics` (Materialize).",
1176    false,
1177);
1178
1179/// The interval to collect statistics for `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker` in
1180/// clusterd. Controls the accuracy of metrics.
1181pub static STORAGE_STATISTICS_COLLECTION_INTERVAL: VarDefinition = VarDefinition::new(
1182    "storage_statistics_collection_interval",
1183    value!(Duration; mz_storage_types::parameters::STATISTICS_COLLECTION_INTERVAL_DEFAULT),
1184    "The interval to collect statistics for `mz_source_statistics_per_worker` \
1185        and `mz_sink_statistics_per_worker` in clusterd. Controls the accuracy of metrics \
1186        (Materialize).",
1187    false,
1188);
1189
1190pub static STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: VarDefinition = VarDefinition::new(
1191    "storage_record_source_sink_namespaced_errors",
1192    value!(bool; true),
1193    "Whether or not to record namespaced errors in the status history tables",
1194    false,
1195);
1196
1197/// Boolean flag indicating whether to enable syncing from
1198/// LaunchDarkly. Can be turned off as an emergency measure to still
1199/// be able to alter parameters while LD is broken.
1200pub static ENABLE_LAUNCHDARKLY: VarDefinition = VarDefinition::new(
1201    "enable_launchdarkly",
1202    value!(bool; true),
1203    "Boolean flag indicating whether flag synchronization from LaunchDarkly should be enabled (Materialize).",
1204    false,
1205);
1206
1207/// Feature flag indicating whether real time recency is enabled. Not that
1208/// unlike other feature flags, this is made available at the session level, so
1209/// is additionally gated by a feature flag.
1210pub static REAL_TIME_RECENCY: VarDefinition = VarDefinition::new(
1211    "real_time_recency",
1212    value!(bool; false),
1213    "Feature flag indicating whether real time recency is enabled (Materialize).",
1214    true,
1215)
1216.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1217
1218pub static REAL_TIME_RECENCY_TIMEOUT: VarDefinition = VarDefinition::new(
1219    "real_time_recency_timeout",
1220    value!(Duration; Duration::from_secs(10)),
1221    "Sets the maximum allowed duration of SELECTs that actively use real-time \
1222    recency, i.e. reach out to an external system to determine their most recencly exposed \
1223    data (Materialize).",
1224    true,
1225)
1226.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1227
1228pub static EMIT_PLAN_INSIGHTS_NOTICE: VarDefinition = VarDefinition::new(
1229    "emit_plan_insights_notice",
1230    value!(bool; false),
1231    "Boolean flag indicating whether to send a NOTICE with JSON-formatted plan insights before executing a SELECT statement (Materialize).",
1232    true,
1233);
1234
1235pub static EMIT_TIMESTAMP_NOTICE: VarDefinition = VarDefinition::new(
1236    "emit_timestamp_notice",
1237    value!(bool; false),
1238    "Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize).",
1239    true,
1240);
1241
1242pub static EMIT_TRACE_ID_NOTICE: VarDefinition = VarDefinition::new(
1243    "emit_trace_id_notice",
1244    value!(bool; false),
1245    "Boolean flag indicating whether to send a NOTICE specifying the trace id when available (Materialize).",
1246    true,
1247);
1248
1249pub static UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP: VarDefinition = VarDefinition::new(
1250    "unsafe_mock_audit_event_timestamp",
1251    value!(Option<mz_repr::Timestamp>; None),
1252    "Mocked timestamp to use for audit events for testing purposes",
1253    false,
1254);
1255
1256pub static ENABLE_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1257    "enable_rbac_checks",
1258    value!(bool; true),
1259    "User facing global boolean flag indicating whether to apply RBAC checks before \
1260        executing statements (Materialize).",
1261    true,
1262);
1263
1264pub static ENABLE_SESSION_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1265    "enable_session_rbac_checks",
1266    // TODO(jkosh44) Once RBAC is enabled in all environments, change this to `true`.
1267    value!(bool; false),
1268    "User facing session boolean flag indicating whether to apply RBAC checks before \
1269        executing statements (Materialize).",
1270    true,
1271);
1272
1273pub static EMIT_INTROSPECTION_QUERY_NOTICE: VarDefinition = VarDefinition::new(
1274    "emit_introspection_query_notice",
1275    value!(bool; true),
1276    "Whether to print a notice when querying per-replica introspection sources.",
1277    true,
1278);
1279
1280// TODO(mgree) change this to a SelectOption
1281pub static ENABLE_SESSION_CARDINALITY_ESTIMATES: VarDefinition = VarDefinition::new(
1282    "enable_session_cardinality_estimates",
1283    value!(bool; false),
1284    "Feature flag indicating whether to use cardinality estimates when optimizing queries; \
1285        does not affect EXPLAIN WITH(cardinality) (Materialize).",
1286    true,
1287)
1288.with_feature_flag(&ENABLE_CARDINALITY_ESTIMATES);
1289
1290pub static OPTIMIZER_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1291    "optimizer_stats_timeout",
1292    value!(Duration; Duration::from_millis(250)),
1293    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1294        applied to non-oneshot, i.e., long-lasting queries, like CREATE MATERIALIZED VIEW (Materialize).",
1295    false,
1296);
1297
1298pub static OPTIMIZER_ONESHOT_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1299    "optimizer_oneshot_stats_timeout",
1300    value!(Duration; Duration::from_millis(10)),
1301    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1302        applied to oneshot queries, like SELECT (Materialize).",
1303    false,
1304);
1305
1306pub static PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE: VarDefinition = VarDefinition::new(
1307    "privatelink_status_update_quota_per_minute",
1308    value!(u32; 20),
1309    "Sets the per-minute quota for privatelink vpc status updates to be written to \
1310        the storage-collection-backed system table. This value implies the total and burst quota per-minute.",
1311    false,
1312);
1313
1314pub static STATEMENT_LOGGING_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1315    "statement_logging_sample_rate",
1316    lazy_value!(Numeric; || 0.1.into()),
1317    "User-facing session variable indicating how many statement executions should be \
1318        logged, subject to constraint by the system variable `statement_logging_max_sample_rate` (Materialize).",
1319    true,
1320).with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1321
1322pub static ENABLE_DEFAULT_CONNECTION_VALIDATION: VarDefinition = VarDefinition::new(
1323    "enable_default_connection_validation",
1324    value!(bool; true),
1325    "LD facing global boolean flag that allows turning default connection validation off for everyone (Materialize).",
1326    false,
1327);
1328
1329pub static STATEMENT_LOGGING_MAX_DATA_CREDIT: VarDefinition = VarDefinition::new(
1330    "statement_logging_max_data_credit",
1331    value!(Option<usize>; Some(50 * 1024 * 1024)),
1332    // The idea is that during periods of low logging, tokens can accumulate up to this value,
1333    // and then be depleted during periods of high logging.
1334    "The maximum number of bytes that can be logged for statement logging in short burts, or NULL if unlimited (Materialize).",
1335    false,
1336);
1337
1338pub static STATEMENT_LOGGING_TARGET_DATA_RATE: VarDefinition = VarDefinition::new(
1339    "statement_logging_target_data_rate",
1340    value!(Option<usize>; Some(2071)),
1341    "The maximum sustained data rate of statement logging, in bytes per second, or NULL if unlimited (Materialize).",
1342    false,
1343);
1344
1345pub static STATEMENT_LOGGING_MAX_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1346    "statement_logging_max_sample_rate",
1347    lazy_value!(Numeric; || 0.99.into()),
1348    "The maximum rate at which statements may be logged. If this value is less than \
1349        that of `statement_logging_sample_rate`, the latter is ignored (Materialize).",
1350    true,
1351)
1352.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1353
1354pub static STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1355    "statement_logging_default_sample_rate",
1356    lazy_value!(Numeric; || 0.99.into()),
1357    "The default value of `statement_logging_sample_rate` for new sessions (Materialize).",
1358    true,
1359)
1360.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1361
1362pub static ENABLE_INTERNAL_STATEMENT_LOGGING: VarDefinition = VarDefinition::new(
1363    "enable_internal_statement_logging",
1364    value!(bool; false),
1365    "Whether to log statements from the `mz_system` user.",
1366    false,
1367);
1368
1369pub static AUTO_ROUTE_CATALOG_QUERIES: VarDefinition = VarDefinition::new(
1370    "auto_route_catalog_queries",
1371    value!(bool; true),
1372    "Whether to force queries that depend only on system tables, to run on the mz_catalog_server cluster (Materialize).",
1373    true,
1374);
1375
1376pub static MAX_CONNECTIONS: VarDefinition = VarDefinition::new(
1377    "max_connections",
1378    value!(u32; 5000),
1379    "The maximum number of concurrent connections (PostgreSQL).",
1380    true,
1381);
1382
1383pub static SUPERUSER_RESERVED_CONNECTIONS: VarDefinition = VarDefinition::new(
1384    "superuser_reserved_connections",
1385    value!(u32; 3),
1386    "The number of connections that are reserved for superusers (PostgreSQL).",
1387    true,
1388);
1389
1390/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_source_status_history_entries`].
1391pub static KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1392    "keep_n_source_status_history_entries",
1393    value!(usize; 5),
1394    "On reboot, truncate all but the last n entries per ID in the source_status_history collection (Materialize).",
1395    false,
1396);
1397
1398/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_sink_status_history_entries`].
1399pub static KEEP_N_SINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1400    "keep_n_sink_status_history_entries",
1401    value!(usize; 5),
1402    "On reboot, truncate all but the last n entries per ID in the sink_status_history collection (Materialize).",
1403    false,
1404);
1405
1406/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_privatelink_status_history_entries`].
1407pub static KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1408    "keep_n_privatelink_status_history_entries",
1409    value!(usize; 5),
1410    "On reboot, truncate all but the last n entries per ID in the mz_aws_privatelink_connection_status_history \
1411        collection (Materialize).",
1412    false,
1413);
1414
1415/// Controls [`mz_storage_types::parameters::StorageParameters::replica_status_history_retention_window`].
1416pub static REPLICA_STATUS_HISTORY_RETENTION_WINDOW: VarDefinition = VarDefinition::new(
1417    "replica_status_history_retention_window",
1418    value!(Duration; REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT),
1419    "On reboot, truncate up all entries past the retention window in the mz_cluster_replica_status_history \
1420        collection (Materialize).",
1421    false,
1422);
1423
1424pub static ENABLE_STORAGE_SHARD_FINALIZATION: VarDefinition = VarDefinition::new(
1425    "enable_storage_shard_finalization",
1426    value!(bool; true),
1427    "Whether to allow the storage client to finalize shards (Materialize).",
1428    false,
1429);
1430
1431pub static ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE: VarDefinition = VarDefinition::new(
1432    "enable_consolidate_after_union_negate",
1433    value!(bool; true),
1434    "consolidation after Unions that have a Negated input (Materialize).",
1435    true,
1436);
1437
1438pub static DEFAULT_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1439    "default_timestamp_interval",
1440    value!(Duration; Duration::from_millis(1000)),
1441    "The interval at which timestamps are assigned to data from sources and tables.",
1442    false,
1443);
1444
1445pub static MIN_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1446    "min_timestamp_interval",
1447    value!(Duration; Duration::from_millis(1000)),
1448    "Minimum timestamp interval",
1449    false,
1450);
1451
1452pub static MAX_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1453    "max_timestamp_interval",
1454    value!(Duration; Duration::from_millis(1000)),
1455    "Maximum timestamp interval",
1456    false,
1457);
1458
1459pub static WEBHOOK_CONCURRENT_REQUEST_LIMIT: VarDefinition = VarDefinition::new(
1460    "webhook_concurrent_request_limit",
1461    value!(usize; WEBHOOK_CONCURRENCY_LIMIT),
1462    "Maximum number of concurrent requests for appending to a webhook source.",
1463    false,
1464);
1465
1466pub static USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION: VarDefinition = VarDefinition::new(
1467    "user_storage_managed_collections_batch_duration",
1468    value!(Duration; STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT),
1469    "Duration which we'll wait to collect a batch of events for a webhook source.",
1470    false,
1471);
1472
1473// This system var will need to point to the name of an existing network policy
1474// this will be enforced on alter_system_set
1475pub static NETWORK_POLICY: VarDefinition = VarDefinition::new_lazy(
1476    "network_policy",
1477    lazy_value!(String; || "default".to_string()),
1478    "Sets the fallback network policy applied to all users without an explicit policy.",
1479    true,
1480);
1481
1482pub static FORCE_SOURCE_TABLE_SYNTAX: VarDefinition = VarDefinition::new(
1483    "force_source_table_syntax",
1484    value!(bool; false),
1485    "Force use of new source model (CREATE TABLE .. FROM SOURCE) and migrate existing sources",
1486    true,
1487);
1488
1489pub static OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD: VarDefinition = VarDefinition::new(
1490    "optimizer_e2e_latency_warning_threshold",
1491    value!(Duration; Duration::from_millis(500)),
1492    "Sets the duration that a query can take to compile; queries that take longer \
1493        will trigger a warning. If this value is specified without units, it is taken as \
1494        milliseconds. A value of zero disables the timeout (Materialize).",
1495    true,
1496);
1497
1498/// Configuration for gRPC client connections.
1499pub mod grpc_client {
1500    use super::*;
1501
1502    pub static CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1503        "grpc_client_connect_timeout",
1504        value!(Duration; Duration::from_secs(5)),
1505        "Timeout to apply to initial gRPC client connection establishment.",
1506        false,
1507    );
1508
1509    pub static HTTP2_KEEP_ALIVE_INTERVAL: VarDefinition = VarDefinition::new(
1510        "grpc_client_http2_keep_alive_interval",
1511        value!(Duration; Duration::from_secs(3)),
1512        "Idle time to wait before sending HTTP/2 PINGs to maintain established gRPC client connections.",
1513        false,
1514    );
1515
1516    pub static HTTP2_KEEP_ALIVE_TIMEOUT: VarDefinition = VarDefinition::new(
1517        "grpc_client_http2_keep_alive_timeout",
1518        value!(Duration; Duration::from_secs(60)),
1519        "Time to wait for HTTP/2 pong response before terminating a gRPC client connection.",
1520        false,
1521    );
1522}
1523
1524/// Configuration for how cluster replicas are scheduled.
1525pub mod cluster_scheduling {
1526    use super::*;
1527    use mz_orchestrator::scheduling_config::*;
1528
1529    pub static CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1530        "cluster_multi_process_replica_az_affinity_weight",
1531        value!(Option<i32>; DEFAULT_POD_AZ_AFFINITY_WEIGHT),
1532        "Whether or not to add an availability zone affinity between instances of \
1533            multi-process replicas. Either an affinity weight or empty (off) (Materialize).",
1534        false,
1535    );
1536
1537    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY: VarDefinition = VarDefinition::new(
1538        "cluster_soften_replication_anti_affinity",
1539        value!(bool; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY),
1540        "Whether or not to turn the node-scope anti affinity between replicas \
1541            in the same cluster into a preference (Materialize).",
1542        false,
1543    );
1544
1545    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1546        "cluster_soften_replication_anti_affinity_weight",
1547        value!(i32; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT),
1548        "The preference weight for `cluster_soften_replication_anti_affinity` (Materialize).",
1549        false,
1550    );
1551
1552    pub static CLUSTER_ENABLE_TOPOLOGY_SPREAD: VarDefinition = VarDefinition::new(
1553        "cluster_enable_topology_spread",
1554        value!(bool; DEFAULT_TOPOLOGY_SPREAD_ENABLED),
1555        "Whether or not to add topology spread constraints among replicas in the same cluster (Materialize).",
1556        false,
1557    );
1558
1559    pub static CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: VarDefinition =
1560        VarDefinition::new(
1561            "cluster_topology_spread_ignore_non_singular_scale",
1562            value!(bool; DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE),
1563            "If true, ignore replicas with more than 1 process when adding topology spread constraints (Materialize).",
1564            false,
1565        );
1566
1567    pub static CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW: VarDefinition = VarDefinition::new(
1568        "cluster_topology_spread_max_skew",
1569        value!(i32; DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW),
1570        "The `maxSkew` for replica topology spread constraints (Materialize).",
1571        false,
1572    );
1573
1574    // `minDomains`, like maxSkew, is used to spread across a topology
1575    // key. Unlike max skew, minDomains will force node creation to ensure
1576    // distribution across a minimum number of keys.
1577    // https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#spread-constraint-definition
1578    pub static CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS: VarDefinition = VarDefinition::new(
1579        "cluster_topology_spread_min_domains",
1580        value!(Option<i32>; None),
1581        "`minDomains` for replica topology spread constraints. \
1582            Should be set to the number of Availability Zones (Materialize).",
1583        false,
1584    );
1585
1586    pub static CLUSTER_TOPOLOGY_SPREAD_SOFT: VarDefinition = VarDefinition::new(
1587        "cluster_topology_spread_soft",
1588        value!(bool; DEFAULT_TOPOLOGY_SPREAD_SOFT),
1589        "If true, soften the topology spread constraints for replicas (Materialize).",
1590        false,
1591    );
1592
1593    pub static CLUSTER_SOFTEN_AZ_AFFINITY: VarDefinition = VarDefinition::new(
1594        "cluster_soften_az_affinity",
1595        value!(bool; DEFAULT_SOFTEN_AZ_AFFINITY),
1596        "Whether or not to turn the az-scope node affinity for replicas. \
1597            Note this could violate requests from the user (Materialize).",
1598        false,
1599    );
1600
1601    pub static CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1602        "cluster_soften_az_affinity_weight",
1603        value!(i32; DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT),
1604        "The preference weight for `cluster_soften_az_affinity` (Materialize).",
1605        false,
1606    );
1607
1608    const DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL: Duration = Duration::from_secs(3);
1609
1610    pub static CLUSTER_ALTER_CHECK_READY_INTERVAL: VarDefinition = VarDefinition::new(
1611        "cluster_alter_check_ready_interval",
1612        value!(Duration; DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL),
1613        "How often to poll readiness checks for cluster alter",
1614        false,
1615    );
1616
1617    const DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL: Duration = Duration::from_secs(3);
1618
1619    pub static CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL: VarDefinition = VarDefinition::new(
1620        "cluster_check_scheduling_policies_interval",
1621        value!(Duration; DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL),
1622        "How often policies are invoked to automatically start/stop clusters, e.g., \
1623            for REFRESH EVERY materialized views.",
1624        false,
1625    );
1626
1627    pub static CLUSTER_SECURITY_CONTEXT_ENABLED: VarDefinition = VarDefinition::new(
1628        "cluster_security_context_enabled",
1629        value!(bool; DEFAULT_SECURITY_CONTEXT_ENABLED),
1630        "Enables SecurityContext for clusterd instances, restricting capabilities to improve security.",
1631        false,
1632    );
1633
1634    const DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: Duration = Duration::from_secs(1200);
1635
1636    pub static CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: VarDefinition = VarDefinition::new(
1637        "cluster_refresh_mv_compaction_estimate",
1638        value!(Duration; DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE),
1639        "How much time to wait for compaction after a REFRESH MV completes a refresh \
1640            before turning off the refresh cluster. This is needed because Persist does compaction \
1641            only after a write, but refresh MVs do writes only at their refresh times. \
1642            (In the long term, we'd like to remove this configuration and instead wait exactly \
1643            until compaction has settled. We'd need some new Persist API for this.)",
1644        false,
1645    );
1646}
1647
1648/// Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the
1649/// availability of features.
1650///
1651/// The arguments to `feature_flags!` are:
1652/// - `$name`, which will be the name of the feature flag, in snake_case
1653/// - `$feature_desc`, a human-readable description of the feature
1654/// - `$value`, which if not provided, defaults to `false`
1655///
1656/// Note that not all `VarDefinition<bool>` are feature flags. Feature flags are for variables that:
1657/// - Belong to `SystemVars`, _not_ `SessionVars`
1658/// - Default to false and must be explicitly enabled, or default to `true` and can be explicitly disabled.
1659///
1660/// WARNING / CONTRACT: Syntax-related feature flags must always *enable* behavior. In other words,
1661/// setting a feature flag must make the system more permissive. For example, let's suppose we'd like
1662/// to gate deprecated upsert syntax behind a feature flag. In this case, do not add a feature flag
1663/// like `disable_deprecated_upsert_syntax`, as `disable_deprecated_upsert_syntax = on` would
1664/// _prevent_ the system from parsing the deprecated upsert syntax. Instead, use a feature flag
1665/// like `enable_deprecated_upsert_syntax`.
1666///
1667/// The hazard this protects against is related to reboots after feature flags have been disabled.
1668/// Say someone creates a Kinesis source while `enable_kinesis_sources = on`. Materialize will
1669/// commit this source to the system catalog. Then, suppose we discover a catastrophic bug in
1670/// Kinesis sources and set `enable_kinesis_sources` to `off`. This prevents users from creating
1671/// new Kinesis sources, but leaves the existing Kinesis sources in place. This is because
1672/// disabling a feature flag doesn't remove access to catalog objects created while the feature
1673/// flag was live. On the next reboot, Materialize will proceed to load the Kinesis source from the
1674/// catalog, reparsing and replanning the `CREATE SOURCE` definition and rechecking the
1675/// `enable_kinesis_sources` feature flag along the way. Even though the feature flag has been
1676/// switched to `off`, we need to temporarily re-enable it during parsing and planning to be able
1677/// to boot successfully.
1678///
1679/// Ensuring that all syntax-related feature flags *enable* behavior means that setting all such
1680/// feature flags to `on` during catalog boot has the desired effect.
1681macro_rules! feature_flags {
1682    // Match `$name, $feature_desc, $value`.
1683    (@inner
1684        // The feature flag name.
1685        name: $name:expr,
1686        // The feature flag description.
1687        desc: $desc:literal,
1688        // The feature flag default value.
1689        default: $value:expr,
1690    ) => {
1691        paste::paste!{
1692            // Note that the ServerVar is not directly exported; we expect these to be
1693            // accessible through their FeatureFlag variant.
1694            static [<$name:upper _VAR>]: VarDefinition = VarDefinition::new(
1695                stringify!($name),
1696                value!(bool; $value),
1697                concat!("Whether ", $desc, " is allowed (Materialize)."),
1698                false,
1699            );
1700
1701            pub static [<$name:upper >]: FeatureFlag = FeatureFlag {
1702                flag: &[<$name:upper _VAR>],
1703                feature_desc: $desc,
1704            };
1705        }
1706    };
1707    ($({
1708        // The feature flag name.
1709        name: $name:expr,
1710        // The feature flag description.
1711        desc: $desc:literal,
1712        // The feature flag default value.
1713        default: $value:expr,
1714        // Should the feature be turned on during catalog rehydration when
1715        // parsing a catalog item.
1716        enable_for_item_parsing: $enable_for_item_parsing:expr,
1717    },)+) => {
1718        $(feature_flags! { @inner
1719            name: $name,
1720            desc: $desc,
1721            default: $value,
1722        })+
1723
1724        paste::paste!{
1725            pub static FEATURE_FLAGS: &'static [&'static VarDefinition] = &[
1726                $(  & [<$name:upper _VAR>] , )+
1727            ];
1728        }
1729
1730        paste::paste!{
1731            impl super::SystemVars {
1732                pub fn enable_all_feature_flags_by_default(&mut self) {
1733                    $(
1734                        self.set_default(stringify!($name), super::VarInput::Flat("on"))
1735                            .expect("setting default value must work");
1736                    )+
1737                }
1738
1739                pub fn enable_for_item_parsing(&mut self) {
1740                    $(
1741                        if $enable_for_item_parsing {
1742                            self.set(stringify!($name), super::VarInput::Flat("on"))
1743                                .expect("setting default value must work");
1744                        }
1745                    )+
1746                }
1747
1748                $(
1749                    pub fn [<$name:lower>](&self) -> bool {
1750                        *self.expect_value(&[<$name:upper _VAR>])
1751                    }
1752                )+
1753            }
1754        }
1755    }
1756}
1757
1758feature_flags!(
1759    // Gates for other feature flags
1760    {
1761        name: allow_real_time_recency,
1762        desc: "real time recency",
1763        default: false,
1764        enable_for_item_parsing: true,
1765    },
1766    // Actual feature flags
1767    {
1768        name: enable_guard_subquery_tablefunc,
1769        desc: "Whether HIR -> MIR lowering should use a new tablefunc to guard subquery sizes",
1770        default: true,
1771        enable_for_item_parsing: false,
1772    },
1773    {
1774        name: enable_binary_date_bin,
1775        desc: "the binary version of date_bin function",
1776        default: false,
1777        enable_for_item_parsing: true,
1778    },
1779    {
1780        name: enable_date_bin_hopping,
1781        desc: "the date_bin_hopping function",
1782        default: false,
1783        enable_for_item_parsing: true,
1784    },
1785    {
1786        name: enable_envelope_debezium_in_subscribe,
1787        desc: "`ENVELOPE DEBEZIUM (KEY (..))`",
1788        default: false,
1789        enable_for_item_parsing: true,
1790    },
1791    {
1792        name: enable_envelope_materialize,
1793        desc: "ENVELOPE MATERIALIZE",
1794        default: false,
1795        enable_for_item_parsing: true,
1796    },
1797    {
1798        name: enable_explain_pushdown,
1799        desc: "EXPLAIN FILTER PUSHDOWN",
1800        default: true,
1801        enable_for_item_parsing: true,
1802    },
1803    {
1804        name: enable_index_options,
1805        desc: "INDEX OPTIONS",
1806        default: false,
1807        enable_for_item_parsing: true,
1808    },
1809    {
1810        name: enable_list_length_max,
1811        desc: "the list_length_max function",
1812        default: false,
1813        enable_for_item_parsing: true,
1814    },
1815    {
1816        name: enable_list_n_layers,
1817        desc: "the list_n_layers function",
1818        default: false,
1819        enable_for_item_parsing: true,
1820    },
1821    {
1822        name: enable_list_remove,
1823        desc: "the list_remove function",
1824        default: false,
1825        enable_for_item_parsing: true,
1826    },
1827    {
1828
1829        name: enable_logical_compaction_window,
1830        desc: "RETAIN HISTORY",
1831        default: false,
1832        enable_for_item_parsing: true,
1833    },
1834    {
1835        name: enable_primary_key_not_enforced,
1836        desc: "PRIMARY KEY NOT ENFORCED",
1837        default: false,
1838        enable_for_item_parsing: true,
1839    },
1840    {
1841        name: enable_collection_partition_by,
1842        desc: "PARTITION BY",
1843        default: true,
1844        enable_for_item_parsing: true,
1845    },
1846    {
1847        name: enable_multi_worker_storage_persist_sink,
1848        desc: "multi-worker storage persist sink",
1849        default: true,
1850        enable_for_item_parsing: true,
1851    },
1852    {
1853        name: enable_persist_streaming_snapshot_and_fetch,
1854        desc: "use the new streaming consolidate for snapshot_and_fetch",
1855        default: false,
1856        enable_for_item_parsing: true,
1857    },
1858    {
1859        name: enable_persist_streaming_compaction,
1860        desc: "use the new streaming consolidate for compaction",
1861        default: false,
1862        enable_for_item_parsing: true,
1863    },
1864    {
1865        name: enable_raise_statement,
1866        desc: "RAISE statement",
1867        default: false,
1868        enable_for_item_parsing: true,
1869    },
1870    {
1871        name: enable_repeat_row,
1872        desc: "the repeat_row function",
1873        default: false,
1874        enable_for_item_parsing: true,
1875    },
1876    {
1877        name: enable_repeat_row_non_negative,
1878        desc: "the repeat_row_non_negative function",
1879        default: false,
1880        enable_for_item_parsing: true,
1881    },
1882    {
1883        name: enable_replica_targeted_materialized_views,
1884        desc: "replica-targeted materialized views",
1885        default: false,
1886        enable_for_item_parsing: true,
1887    },
1888    {
1889        name: unsafe_enable_table_check_constraint,
1890        desc: "CREATE TABLE with a check constraint",
1891        default: false,
1892        enable_for_item_parsing: true,
1893    },
1894    {
1895        name: unsafe_enable_table_foreign_key,
1896        desc: "CREATE TABLE with a foreign key",
1897        default: false,
1898        enable_for_item_parsing: true,
1899    },
1900    {
1901        name: unsafe_enable_table_keys,
1902        desc: "CREATE TABLE with a primary key or unique constraint",
1903        default: false,
1904        enable_for_item_parsing: true,
1905    },
1906    {
1907        name: unsafe_enable_unorchestrated_cluster_replicas,
1908        desc: "unorchestrated cluster replicas",
1909        default: false,
1910        enable_for_item_parsing: true,
1911    },
1912    {
1913        name: unsafe_enable_unstable_dependencies,
1914        desc: "depending on unstable objects",
1915        default: false,
1916        enable_for_item_parsing: true,
1917    },
1918    {
1919        name: enable_within_timestamp_order_by_in_subscribe,
1920        desc: "`WITHIN TIMESTAMP ORDER BY ..`",
1921        default: false,
1922        enable_for_item_parsing: true,
1923    },
1924    {
1925        name: enable_cardinality_estimates,
1926        desc: "join planning with cardinality estimates",
1927        default: false,
1928        enable_for_item_parsing: false,
1929    },
1930    {
1931        name: enable_connection_validation_syntax,
1932        desc: "CREATE CONNECTION .. WITH (VALIDATE) and VALIDATE CONNECTION syntax",
1933        default: true,
1934        enable_for_item_parsing: true,
1935    },
1936    {
1937        name: enable_kafka_broker_matching_rules,
1938        desc: "MATCHING broker rules in BROKERS for Kafka PrivateLink connections",
1939        default: false,
1940        enable_for_item_parsing: true,
1941    },
1942    {
1943        name: enable_alter_set_cluster,
1944        desc: "ALTER ... SET CLUSTER syntax",
1945        default: false,
1946        enable_for_item_parsing: true,
1947    },
1948    {
1949        name: unsafe_enable_unsafe_functions,
1950        desc: "executing potentially dangerous functions",
1951        default: false,
1952        enable_for_item_parsing: true,
1953    },
1954    {
1955        name: enable_managed_cluster_availability_zones,
1956        desc: "MANAGED, AVAILABILITY ZONES syntax",
1957        default: false,
1958        enable_for_item_parsing: true,
1959    },
1960    {
1961        name: statement_logging_use_reproducible_rng,
1962        desc: "statement logging with reproducible RNG",
1963        default: false,
1964        enable_for_item_parsing: false,
1965    },
1966    {
1967        name: enable_notices_for_index_already_exists,
1968        desc: "emitting notices for IndexAlreadyExists (doesn't affect EXPLAIN)",
1969        default: true,
1970        enable_for_item_parsing: true,
1971    },
1972    {
1973        name: enable_notices_for_index_too_wide_for_literal_constraints,
1974        desc: "emitting notices for IndexTooWideForLiteralConstraints (doesn't affect EXPLAIN)",
1975        default: false,
1976        enable_for_item_parsing: true,
1977    },
1978    {
1979        name: enable_notices_for_index_empty_key,
1980        desc: "emitting notices for indexes with an empty key (doesn't affect EXPLAIN)",
1981        default: true,
1982        enable_for_item_parsing: true,
1983    },
1984    {
1985        name: enable_notices_for_equals_null,
1986        desc: "emitting notices for `= NULL` and `<> NULL` comparisons (doesn't affect EXPLAIN)",
1987        default: true,
1988        enable_for_item_parsing: true,
1989    },
1990    {
1991        name: enable_alter_swap,
1992        desc: "the ALTER SWAP feature for objects",
1993        default: true,
1994        enable_for_item_parsing: true,
1995    },
1996    {
1997        name: enable_new_outer_join_lowering,
1998        desc: "new outer join lowering",
1999        default: true,
2000        enable_for_item_parsing: false,
2001    },
2002    {
2003        name: enable_time_at_time_zone,
2004        desc: "use of AT TIME ZONE or timezone() with time type",
2005        default: false,
2006        enable_for_item_parsing: true,
2007    },
2008    {
2009        name: enable_load_generator_counter,
2010        desc: "Create a LOAD GENERATOR COUNTER",
2011        default: false,
2012        enable_for_item_parsing: true,
2013    },
2014    {
2015        name: enable_load_generator_clock,
2016        desc: "Create a LOAD GENERATOR CLOCK",
2017        default: false,
2018        enable_for_item_parsing: true,
2019    },
2020    {
2021        name: enable_load_generator_datums,
2022        desc: "Create a LOAD GENERATOR DATUMS",
2023        default: false,
2024        enable_for_item_parsing: true,
2025    },
2026    {
2027        name: enable_load_generator_key_value,
2028        desc: "Create a LOAD GENERATOR KEY VALUE",
2029        default: false,
2030        enable_for_item_parsing: true,
2031    },
2032    {
2033        name: enable_expressions_in_limit_syntax,
2034        desc: "LIMIT <expr> syntax",
2035        default: true,
2036        enable_for_item_parsing: true,
2037    },
2038    {
2039        name: enable_mz_notices,
2040        desc: "Populate the contents of `mz_internal.mz_notices`",
2041        default: true,
2042        enable_for_item_parsing: false,
2043    },
2044    {
2045        name: enable_eager_delta_joins,
2046        desc:
2047            "eager delta joins",
2048        default: false,
2049        enable_for_item_parsing: false,
2050    },
2051    {
2052        name: enable_off_thread_optimization,
2053        desc: "use off-thread optimization in `CREATE` statements",
2054        default: true,
2055        enable_for_item_parsing: false,
2056    },
2057    {
2058        name: enable_refresh_every_mvs,
2059        desc: "REFRESH EVERY and REFRESH AT materialized views",
2060        default: false,
2061        enable_for_item_parsing: true,
2062    },
2063    {
2064        name: enable_cluster_schedule_refresh,
2065        desc: "`SCHEDULE = ON REFRESH` cluster option",
2066        default: false,
2067        enable_for_item_parsing: true,
2068    },
2069    {
2070        name: enable_reduce_mfp_fusion,
2071        desc: "fusion of MFPs in reductions",
2072        default: true,
2073        enable_for_item_parsing: false,
2074    },
2075    {
2076        name: enable_worker_core_affinity,
2077        desc: "set core affinity for replica worker threads",
2078        default: false,
2079        enable_for_item_parsing: false,
2080    },
2081    {
2082        name: enable_storage_introspection_logs,
2083        desc: "forward storage timely logging events into compute's introspection dataflow",
2084        default: false,
2085        enable_for_item_parsing: false,
2086    },
2087    {
2088        name: enable_copy_to_expr,
2089        desc: "COPY ... TO 's3://...'",
2090        default: true,
2091        enable_for_item_parsing: false,
2092    },
2093    {
2094        name: enable_session_timelines,
2095        desc: "strong session serializable isolation levels",
2096        default: false,
2097        enable_for_item_parsing: false,
2098    },
2099    {
2100        name: enable_variadic_left_join_lowering,
2101        desc: "Enable joint HIR ⇒ MIR lowering of stacks of left joins",
2102        default: true,
2103        enable_for_item_parsing: false,
2104    },
2105    {
2106        name: enable_redacted_test_option,
2107        desc: "Enable useless option to test value redaction",
2108        default: false,
2109        enable_for_item_parsing: true,
2110    },
2111    {
2112        name: enable_letrec_fixpoint_analysis,
2113        desc: "Enable Lattice-based fixpoint iteration on LetRec nodes in the Analysis framework",
2114        default: true, // This is just a failsafe switch for the deployment of materialize#25591.
2115        enable_for_item_parsing: false,
2116    },
2117    {
2118        name: enable_kafka_sink_headers,
2119        desc: "Enable the HEADERS option for Kafka sinks",
2120        default: false,
2121        enable_for_item_parsing: true,
2122    },
2123    {
2124        name: enable_unlimited_retain_history,
2125        desc: "Disable limits on RETAIN HISTORY (below 1s default, and 0 disables compaction).",
2126        default: false,
2127        enable_for_item_parsing: true,
2128    },
2129    {
2130        name: enable_envelope_upsert_inline_errors,
2131        desc: "The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT",
2132        default: true,
2133        enable_for_item_parsing: true,
2134    },
2135    {
2136        name: enable_alter_table_add_column,
2137        desc: "Enable ALTER TABLE ... ADD COLUMN ...",
2138        default: false,
2139        enable_for_item_parsing: false,
2140    },
2141    {
2142        name: enable_zero_downtime_cluster_reconfiguration,
2143        desc: "Enable zero-downtime reconfiguration for alter cluster",
2144        default: false,
2145        enable_for_item_parsing: false,
2146    },
2147    {
2148        name: enable_aws_msk_iam_auth,
2149        desc: "Enable AWS MSK IAM authentication for Kafka connections",
2150        default: true,
2151        enable_for_item_parsing: true,
2152    },
2153    {
2154        name: enable_network_policies,
2155        desc: "ENABLE NETWORK POLICIES",
2156        default: true,
2157        enable_for_item_parsing: true,
2158    },
2159    {
2160        name: enable_create_table_from_source,
2161        desc: "Whether to allow CREATE TABLE .. FROM SOURCE syntax.",
2162        default: false,
2163        enable_for_item_parsing: true,
2164    },
2165    {
2166        name: enable_copy_from_remote,
2167        desc: "Whether to allow COPY FROM <url>.",
2168        default: true,
2169        enable_for_item_parsing: false,
2170    },
2171    {
2172        name: enable_join_prioritize_arranged,
2173        desc: "Whether join planning should prioritize already-arranged keys over keys with more fields.",
2174        default: false,
2175        enable_for_item_parsing: false,
2176    },
2177    {
2178        name: enable_sql_server_source,
2179        desc: "Creating a SQL SERVER source",
2180        default: true,
2181        enable_for_item_parsing: false,
2182    },
2183    {
2184        name: enable_projection_pushdown_after_relation_cse,
2185        desc: "Run ProjectionPushdown one more time after the last RelationCSE.",
2186        default: true,
2187        enable_for_item_parsing: false,
2188    },
2189    {
2190        name: enable_less_reduce_in_eqprop,
2191        desc: "Run MSE::reduce in EquivalencePropagation only if reduce_expr changed something.",
2192        default: true,
2193        enable_for_item_parsing: false,
2194    },
2195    {
2196        name: enable_dequadratic_eqprop_map,
2197        desc: "Skip the quadratic part of EquivalencePropagation's handling of Map.",
2198        default: true,
2199        enable_for_item_parsing: false,
2200    },
2201    {
2202        name: enable_eq_classes_withholding_errors,
2203        desc: "Use `EquivalenceClassesWithholdingErrors` instead of raw `EquivalenceClasses` during eq prop for joins.",
2204        default: true,
2205        enable_for_item_parsing: false,
2206    },
2207    {
2208        name: enable_fast_path_plan_insights,
2209        desc: "Enables those plan insight notices that help with getting fast path queries. Don't turn on before #9492 is fixed!",
2210        default: false,
2211        enable_for_item_parsing: false,
2212    },
2213    {
2214        name: enable_with_ordinality_legacy_fallback,
2215        desc: "When the new WITH ORDINALITY implementation can't be used with a table func, whether to fall back to the legacy implementation or error out.",
2216        default: false,
2217        enable_for_item_parsing: true,
2218    },
2219    {
2220        name: enable_iceberg_sink,
2221        desc: "Whether to enable the Iceberg sink.",
2222        default: true,
2223        enable_for_item_parsing: true,
2224    },
2225    {
2226        name: enable_frontend_peek_sequencing, // currently, changes only take effect for new sessions
2227        desc: "Enables the new peek sequencing code, which does most of its work in the Adapter Frontend instead of the Coordinator main task.",
2228        default: true,
2229        enable_for_item_parsing: false,
2230    },
2231    {
2232        name: enable_replacement_materialized_views,
2233        desc: "Whether to enable replacement materialized views.",
2234        default: true,
2235        enable_for_item_parsing: true,
2236    },
2237    {
2238        name: enable_cast_elimination,
2239        desc: "Allow the optimizer to eliminate noop casts between values of equivalent representation types.",
2240        default: true,
2241        enable_for_item_parsing: false,
2242    },
2243    {
2244        // Just an escape hatch for the unlikely case that we have some user who is doing such
2245        // queries. Can be removed after one week in prod.
2246        // https://github.com/MaterializeInc/database-issues/issues/10004
2247        name: disallow_unmaterializable_functions_as_of,
2248        desc: "Prohibits calling unmaterializable functions (except `mz_now`) in AS OF queries.",
2249        default: true,
2250        enable_for_item_parsing: false,
2251    },
2252    {
2253        name: enable_case_literal_transform,
2254        desc: "Allow the optimizer to rewrite If-chains matching a single expression against literals into a CaseLiteral lookup.",
2255        default: false,
2256        enable_for_item_parsing: false,
2257    },
2258    {
2259        name: enable_simplify_quantified_comparisons,
2260        desc: "Allow the optimizer to simplify quantified comparisons in JOIN ON clauses into semi/anti-join EXISTS form during HIR-to-MIR lowering.",
2261        default: true,
2262        enable_for_item_parsing: false,
2263    },
2264    {
2265        name: enable_coalesce_case_transform,
2266        desc: "Allow the optimizer to push `COALESCE` into `CASE WHEN`.",
2267        default: true,
2268        enable_for_item_parsing: false,
2269    },
2270);
2271
2272impl From<&super::SystemVars> for OptimizerFeatures {
2273    fn from(vars: &super::SystemVars) -> Self {
2274        Self {
2275            enable_guard_subquery_tablefunc: vars.enable_guard_subquery_tablefunc(),
2276            enable_consolidate_after_union_negate: vars.enable_consolidate_after_union_negate(),
2277            enable_eager_delta_joins: vars.enable_eager_delta_joins(),
2278            enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(),
2279            enable_reduce_mfp_fusion: vars.enable_reduce_mfp_fusion(),
2280            enable_variadic_left_join_lowering: vars.enable_variadic_left_join_lowering(),
2281            enable_letrec_fixpoint_analysis: vars.enable_letrec_fixpoint_analysis(),
2282            enable_cardinality_estimates: vars.enable_cardinality_estimates(),
2283            persist_fast_path_limit: vars.persist_fast_path_limit(),
2284            reoptimize_imported_views: false,
2285            enable_join_prioritize_arranged: vars.enable_join_prioritize_arranged(),
2286            enable_projection_pushdown_after_relation_cse: vars
2287                .enable_projection_pushdown_after_relation_cse(),
2288            enable_less_reduce_in_eqprop: vars.enable_less_reduce_in_eqprop(),
2289            enable_dequadratic_eqprop_map: vars.enable_dequadratic_eqprop_map(),
2290            enable_eq_classes_withholding_errors: vars.enable_eq_classes_withholding_errors(),
2291            enable_fast_path_plan_insights: vars.enable_fast_path_plan_insights(),
2292            enable_cast_elimination: vars.enable_cast_elimination(),
2293            enable_case_literal_transform: vars.enable_case_literal_transform(),
2294            enable_simplify_quantified_comparisons: vars.enable_simplify_quantified_comparisons(),
2295            enable_coalesce_case_transform: vars.enable_coalesce_case_transform(),
2296        }
2297    }
2298}
2299
2300#[cfg(test)]
2301mod tests {
2302    use super::*;
2303    use crate::session::vars::SystemVars;
2304
2305    /// Ensure that all vars used for optimizer features have `enable_for_item_parsing = false`.
2306    ///
2307    /// This is important to ensure that plan caching works as intended during item parsing. Cached
2308    /// plans include the optimizer features they were produced with, and if they don't match on
2309    /// lookup, that results in a cache miss.
2310    #[mz_ore::test]
2311    fn optimizer_features_no_enable_for_item_parsing() {
2312        // Construct a `SystemVars` where all optimizer features are `false`.
2313        //
2314        // We do this in a roundabout way, by first constructing all-false `OptimizerFeatures` and
2315        // then assigning them to their respective system vars, to ensure we don't forget to update
2316        // this test when new optimizer features are added.
2317        let false_features = OptimizerFeatures::default();
2318        let OptimizerFeatures {
2319            enable_eq_classes_withholding_errors,
2320            enable_guard_subquery_tablefunc,
2321            enable_consolidate_after_union_negate,
2322            enable_eager_delta_joins,
2323            enable_letrec_fixpoint_analysis,
2324            enable_new_outer_join_lowering,
2325            enable_reduce_mfp_fusion,
2326            enable_variadic_left_join_lowering,
2327            enable_cardinality_estimates,
2328            persist_fast_path_limit,
2329            reoptimize_imported_views,
2330            enable_join_prioritize_arranged,
2331            enable_projection_pushdown_after_relation_cse,
2332            enable_less_reduce_in_eqprop,
2333            enable_dequadratic_eqprop_map,
2334            enable_fast_path_plan_insights,
2335            enable_cast_elimination,
2336            enable_case_literal_transform,
2337            enable_simplify_quantified_comparisons,
2338            enable_coalesce_case_transform,
2339        } = false_features;
2340
2341        let mut vars = SystemVars::new();
2342
2343        macro_rules! set_var {
2344            ($var:ident) => {
2345                vars.set(stringify!($var), VarInput::Flat(&$var.to_string()))
2346                    .unwrap();
2347            };
2348        }
2349
2350        set_var!(enable_eq_classes_withholding_errors);
2351        set_var!(enable_guard_subquery_tablefunc);
2352        set_var!(enable_consolidate_after_union_negate);
2353        set_var!(enable_eager_delta_joins);
2354        set_var!(enable_letrec_fixpoint_analysis);
2355        set_var!(enable_new_outer_join_lowering);
2356        set_var!(enable_reduce_mfp_fusion);
2357        set_var!(enable_variadic_left_join_lowering);
2358        set_var!(enable_cardinality_estimates);
2359        set_var!(persist_fast_path_limit);
2360        let _ = reoptimize_imported_views; // no corresponding var
2361        set_var!(enable_join_prioritize_arranged);
2362        set_var!(enable_projection_pushdown_after_relation_cse);
2363        set_var!(enable_less_reduce_in_eqprop);
2364        set_var!(enable_dequadratic_eqprop_map);
2365        set_var!(enable_fast_path_plan_insights);
2366        set_var!(enable_cast_elimination);
2367        set_var!(enable_case_literal_transform);
2368        set_var!(enable_simplify_quantified_comparisons);
2369        set_var!(enable_coalesce_case_transform);
2370
2371        // Enable for item parsing, then ensure we still get the same optimizer features.
2372        vars.enable_for_item_parsing();
2373        let features_for_item_parsing = OptimizerFeatures::from(&vars);
2374        assert_eq!(features_for_item_parsing, false_features);
2375    }
2376}