Skip to main content

mz_sql/session/vars/
definitions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::num::NonZeroU32;
12use std::str::FromStr;
13use std::sync::Arc;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::{DateTime, Utc};
18use derivative::Derivative;
19use mz_adapter_types::timestamp_oracle::{
20    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
21    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
22};
23use mz_ore::cast::{self, CastFrom};
24use mz_repr::adt::numeric::Numeric;
25use mz_repr::adt::timestamp::CheckedTimestamp;
26use mz_repr::bytes::ByteSize;
27use mz_repr::optimize::OptimizerFeatures;
28use mz_sql_parser::ast::Ident;
29use mz_sql_parser::ident;
30use mz_storage_types::parameters::REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT;
31use mz_storage_types::parameters::{
32    DEFAULT_PG_SOURCE_CONNECT_TIMEOUT, DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
33    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE, DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
34    DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES, DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT,
35    DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT, STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
36};
37use mz_tracing::{CloneableEnvFilter, SerializableDirective};
38use uncased::UncasedStr;
39
40use crate::session::user::{SUPPORT_USER, SYSTEM_USER, User};
41use crate::session::vars::constraints::{
42    BYTESIZE_AT_LEAST_1MB, DomainConstraint, NUMERIC_BOUNDED_0_1_INCLUSIVE, NUMERIC_NON_NEGATIVE,
43    ValueConstraint,
44};
45use crate::session::vars::errors::VarError;
46use crate::session::vars::polyfill::{LazyValueFn, lazy_value, value};
47use crate::session::vars::value::{
48    ClientEncoding, ClientSeverity, DEFAULT_DATE_STYLE, Failpoints, IntervalStyle, IsolationLevel,
49    TimeZone, Value,
50};
51use crate::session::vars::{FeatureFlag, Var, VarInput, VarParseError};
52use crate::{DEFAULT_SCHEMA, WEBHOOK_CONCURRENCY_LIMIT};
53
54/// Definition of a variable.
55#[derive(Clone, Derivative)]
56#[derivative(Debug)]
57pub struct VarDefinition {
58    /// Name of the variable, case-insensitive matching.
59    pub name: &'static UncasedStr,
60    /// Description of the variable.
61    pub description: &'static str,
62    /// Is the variable visible to users, when false only visible to system users.
63    pub user_visible: bool,
64
65    /// Default compiled in value for this variable.
66    pub value: VarDefaultValue,
67    /// Constraint that must be upheld for this variable to be valid.
68    pub constraint: Option<ValueConstraint>,
69    /// When set, prevents getting or setting the variable unless the specified
70    /// feature flag is enabled.
71    pub require_feature_flag: Option<&'static FeatureFlag>,
72
73    /// Method to parse [`VarInput`] into a type that implements [`Value`].
74    ///
75    /// The reason `parse` exists as a function pointer is because we want to achieve two things:
76    ///   1. `VarDefinition` has no generic parameters.
77    ///   2. `Value::parse` returns an instance of `Self`.
78    /// `VarDefinition` holds a `dyn Value`, but `Value::parse` is not object safe because it
79    /// returns `Self`, so we can't call that method. We could change `Value::parse` to return a
80    /// `Box<dyn Value>` making it object safe, but that creates a footgun where it's possible for
81    /// `Value::parse` to return a type that isn't `Self`, e.g. `<String as Value>::parse` could
82    /// return a `usize`!
83    ///
84    /// So to prevent making `VarDefinition` generic over some type `V: Value`, but also defining
85    /// `Value::parse` as returning `Self`, we store a static function pointer to the `parse`
86    /// implementation of our default value.
87    #[derivative(Debug = "ignore")]
88    parse: fn(VarInput) -> Result<Box<dyn Value>, VarParseError>,
89    /// Returns a human readable name for the type of this variable. We store this as a static
90    /// function pointer for the same reason as `parse`.
91    #[derivative(Debug = "ignore")]
92    type_name: fn() -> Cow<'static, str>,
93}
94static_assertions::assert_impl_all!(VarDefinition: Send, Sync);
95
96impl VarDefinition {
97    /// Create a new [`VarDefinition`] in a const context with a value known at compile time.
98    pub const fn new<V: Value>(
99        name: &'static str,
100        value: &'static V,
101        description: &'static str,
102        user_visible: bool,
103    ) -> Self {
104        VarDefinition {
105            name: UncasedStr::new(name),
106            description,
107            value: VarDefaultValue::Static(value),
108            user_visible,
109            parse: V::parse_dyn_value,
110            type_name: V::type_name,
111            constraint: None,
112            require_feature_flag: None,
113        }
114    }
115
116    /// Create a new [`VarDefinition`] in a const context with a lazily evaluated value.
117    pub const fn new_lazy<V: Value, L: LazyValueFn<V>>(
118        name: &'static str,
119        _value: L,
120        description: &'static str,
121        user_visible: bool,
122    ) -> Self {
123        VarDefinition {
124            name: UncasedStr::new(name),
125            description,
126            value: VarDefaultValue::Lazy(L::LAZY_VALUE_FN),
127            user_visible,
128            parse: V::parse_dyn_value,
129            type_name: V::type_name,
130            constraint: None,
131            require_feature_flag: None,
132        }
133    }
134
135    /// Create a new [`VarDefinition`] with a value known at runtime.
136    pub fn new_runtime<V: Value>(
137        name: &'static str,
138        value: V,
139        description: &'static str,
140        user_visible: bool,
141    ) -> Self {
142        VarDefinition {
143            name: UncasedStr::new(name),
144            description,
145            value: VarDefaultValue::Runtime(Arc::new(value)),
146            user_visible,
147            parse: V::parse_dyn_value,
148            type_name: V::type_name,
149            constraint: None,
150            require_feature_flag: None,
151        }
152    }
153
154    /// TODO(parkmycar): Refactor this method onto a `VarDefinitionBuilder` that would allow us to
155    /// constrain `V` here to be the same `V` used in [`VarDefinition::new`].
156    pub const fn with_constraint<V: Value, D: DomainConstraint<Value = V>>(
157        mut self,
158        constraint: &'static D,
159    ) -> Self {
160        self.constraint = Some(ValueConstraint::Domain(constraint));
161        self
162    }
163
164    pub const fn fixed(mut self) -> Self {
165        self.constraint = Some(ValueConstraint::Fixed);
166        self
167    }
168
169    pub const fn read_only(mut self) -> Self {
170        self.constraint = Some(ValueConstraint::ReadOnly);
171        self
172    }
173
174    pub const fn with_feature_flag(mut self, feature_flag: &'static FeatureFlag) -> Self {
175        self.require_feature_flag = Some(feature_flag);
176        self
177    }
178
179    pub fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
180        (self.parse)(input).map_err(|err| err.into_var_error(self))
181    }
182
183    pub fn default_value(&self) -> &'_ dyn Value {
184        self.value.value()
185    }
186}
187
188impl Var for VarDefinition {
189    fn name(&self) -> &'static str {
190        self.name.as_str()
191    }
192
193    fn value(&self) -> String {
194        self.default_value().format()
195    }
196
197    fn description(&self) -> &'static str {
198        self.description
199    }
200
201    fn type_name(&self) -> Cow<'static, str> {
202        (self.type_name)()
203    }
204
205    fn visible(&self, user: &User, system_vars: &super::SystemVars) -> Result<(), VarError> {
206        if !self.user_visible && user != &*SYSTEM_USER && user != &*SUPPORT_USER {
207            Err(VarError::UnknownParameter(self.name().to_string()))
208        } else if self.is_unsafe() && !system_vars.allow_unsafe() {
209            Err(VarError::RequiresUnsafeMode(self.name()))
210        } else {
211            if let Some(flag) = self.require_feature_flag {
212                flag.require(system_vars)?;
213            }
214
215            Ok(())
216        }
217    }
218}
219
220/// The kinds of compiled in default values that can be used with [`VarDefinition`].
221#[derive(Clone, Debug)]
222pub enum VarDefaultValue {
223    /// Static that can be evaluated at compile time.
224    Static(&'static dyn Value),
225    /// Lazy value that is defined at compile time, but created at runtime.
226    Lazy(fn() -> &'static dyn Value),
227    /// Value created at runtime. Note: This is generally an escape hatch.
228    Runtime(Arc<dyn Value>),
229}
230
231impl VarDefaultValue {
232    pub fn value(&self) -> &'_ dyn Value {
233        match self {
234            VarDefaultValue::Static(s) => *s,
235            VarDefaultValue::Lazy(l) => (l)(),
236            VarDefaultValue::Runtime(r) => r.as_ref(),
237        }
238    }
239}
240
241// We pretend to be Postgres v9.5.0, which is also what CockroachDB pretends to
242// be. Too new and some clients will emit a "server too new" warning. Too old
243// and some clients will fall back to legacy code paths. v9.5.0 empirically
244// seems to be a good compromise.
245
246/// The major version of PostgreSQL that Materialize claims to be.
247pub const SERVER_MAJOR_VERSION: u8 = 9;
248
249/// The minor version of PostgreSQL that Materialize claims to be.
250pub const SERVER_MINOR_VERSION: u8 = 5;
251
252/// The patch version of PostgreSQL that Materialize claims to be.
253pub const SERVER_PATCH_VERSION: u8 = 0;
254
255/// The name of the default database that Materialize uses.
256pub const DEFAULT_DATABASE_NAME: &str = "materialize";
257
258pub static APPLICATION_NAME: VarDefinition = VarDefinition::new(
259    "application_name",
260    value!(String; String::new()),
261    "Sets the application name to be reported in statistics and logs (PostgreSQL).",
262    true,
263);
264
265pub static CLIENT_ENCODING: VarDefinition = VarDefinition::new(
266    "client_encoding",
267    value!(ClientEncoding; ClientEncoding::Utf8),
268    "Sets the client's character set encoding (PostgreSQL).",
269    true,
270);
271
272pub static CLIENT_MIN_MESSAGES: VarDefinition = VarDefinition::new(
273    "client_min_messages",
274    value!(ClientSeverity; ClientSeverity::Notice),
275    "Sets the message levels that are sent to the client (PostgreSQL).",
276    true,
277);
278
279pub static CLUSTER: VarDefinition = VarDefinition::new_lazy(
280    "cluster",
281    lazy_value!(String; || "quickstart".to_string()),
282    "Sets the current cluster (Materialize).",
283    true,
284);
285
286pub static CLUSTER_REPLICA: VarDefinition = VarDefinition::new(
287    "cluster_replica",
288    value!(Option<String>; None),
289    "Sets a target cluster replica for SELECT queries (Materialize).",
290    true,
291);
292
293pub static CURRENT_OBJECT_MISSING_WARNINGS: VarDefinition = VarDefinition::new(
294    "current_object_missing_warnings",
295    value!(bool; true),
296    "Whether to emit warnings when the current database, schema, or cluster is missing (Materialize).",
297    true,
298);
299
300pub static DATABASE: VarDefinition = VarDefinition::new_lazy(
301    "database",
302    lazy_value!(String; || DEFAULT_DATABASE_NAME.to_string()),
303    "Sets the current database (CockroachDB).",
304    true,
305);
306
307pub static DATE_STYLE: VarDefinition = VarDefinition::new(
308    // DateStyle has nonstandard capitalization for historical reasons.
309    "DateStyle",
310    &DEFAULT_DATE_STYLE,
311    "Sets the display format for date and time values (PostgreSQL).",
312    true,
313);
314
315pub static DEFAULT_CLUSTER_REPLICATION_FACTOR: VarDefinition = VarDefinition::new(
316    "default_cluster_replication_factor",
317    value!(u32; 1),
318    "Default cluster replication factor (Materialize).",
319    true,
320);
321
322pub static EXTRA_FLOAT_DIGITS: VarDefinition = VarDefinition::new(
323    "extra_float_digits",
324    value!(i32; 3),
325    "Adjusts the number of digits displayed for floating-point values (PostgreSQL).",
326    true,
327);
328
329pub static FAILPOINTS: VarDefinition = VarDefinition::new(
330    "failpoints",
331    value!(Failpoints; Failpoints),
332    "Allows failpoints to be dynamically activated.",
333    true,
334);
335
336pub static INTEGER_DATETIMES: VarDefinition = VarDefinition::new(
337    "integer_datetimes",
338    value!(bool; true),
339    "Reports whether the server uses 64-bit-integer dates and times (PostgreSQL).",
340    true,
341)
342.fixed();
343
344pub static INTERVAL_STYLE: VarDefinition = VarDefinition::new(
345    // IntervalStyle has nonstandard capitalization for historical reasons.
346    "IntervalStyle",
347    value!(IntervalStyle; IntervalStyle::Postgres),
348    "Sets the display format for interval values (PostgreSQL).",
349    true,
350);
351
352pub const MZ_VERSION_NAME: &UncasedStr = UncasedStr::new("mz_version");
353pub const IS_SUPERUSER_NAME: &UncasedStr = UncasedStr::new("is_superuser");
354
355// Schema can be used an alias for a search path with a single element.
356pub const SCHEMA_ALIAS: &UncasedStr = UncasedStr::new("schema");
357pub static SEARCH_PATH: VarDefinition = VarDefinition::new_lazy(
358    "search_path",
359    lazy_value!(Vec<Ident>; || vec![ident!(DEFAULT_SCHEMA)]),
360    "Sets the schema search order for names that are not schema-qualified (PostgreSQL).",
361    true,
362);
363
364pub static STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
365    "statement_timeout",
366    value!(Duration; Duration::from_secs(60)),
367    "Sets the maximum allowed duration of INSERT...SELECT, UPDATE, and DELETE operations. \
368    If this value is specified without units, it is taken as milliseconds.",
369    true,
370);
371
372pub static IDLE_IN_TRANSACTION_SESSION_TIMEOUT: VarDefinition = VarDefinition::new(
373    "idle_in_transaction_session_timeout",
374    value!(Duration; Duration::from_secs(60 * 2)),
375    "Sets the maximum allowed duration that a session can sit idle in a transaction before \
376    being terminated. If this value is specified without units, it is taken as milliseconds. \
377    A value of zero disables the timeout (PostgreSQL).",
378    true,
379);
380
381pub static SERVER_VERSION: VarDefinition = VarDefinition::new_lazy(
382    "server_version",
383    lazy_value!(String; || {
384        format!("{SERVER_MAJOR_VERSION}.{SERVER_MINOR_VERSION}.{SERVER_PATCH_VERSION}")
385    }),
386    "Shows the PostgreSQL compatible server version (PostgreSQL).",
387    true,
388)
389.read_only();
390
391pub static SERVER_VERSION_NUM: VarDefinition = VarDefinition::new(
392    "server_version_num",
393    value!(i32; (cast::u8_to_i32(SERVER_MAJOR_VERSION) * 10_000)
394        + (cast::u8_to_i32(SERVER_MINOR_VERSION) * 100)
395        + cast::u8_to_i32(SERVER_PATCH_VERSION)),
396    "Shows the PostgreSQL compatible server version as an integer (PostgreSQL).",
397    true,
398)
399.read_only();
400
401pub static SQL_SAFE_UPDATES: VarDefinition = VarDefinition::new(
402    "sql_safe_updates",
403    value!(bool; false),
404    "Prohibits SQL statements that may be overly destructive (CockroachDB).",
405    true,
406);
407
408pub static STANDARD_CONFORMING_STRINGS: VarDefinition = VarDefinition::new(
409    "standard_conforming_strings",
410    value!(bool; true),
411    "Causes '...' strings to treat backslashes literally (PostgreSQL).",
412    true,
413)
414.fixed();
415
416pub static TIMEZONE: VarDefinition = VarDefinition::new(
417    // TimeZone has nonstandard capitalization for historical reasons.
418    "TimeZone",
419    value!(TimeZone; TimeZone::UTC),
420    "Sets the time zone for displaying and interpreting time stamps (PostgreSQL).",
421    true,
422);
423
424pub const TRANSACTION_ISOLATION_VAR_NAME: &str = "transaction_isolation";
425pub static TRANSACTION_ISOLATION: VarDefinition = VarDefinition::new(
426    TRANSACTION_ISOLATION_VAR_NAME,
427    value!(IsolationLevel; IsolationLevel::StrictSerializable),
428    "Sets the current transaction's isolation level (PostgreSQL).",
429    true,
430);
431
432pub static MAX_KAFKA_CONNECTIONS: VarDefinition = VarDefinition::new(
433    "max_kafka_connections",
434    value!(u32; 1000),
435    "The maximum number of Kafka connections in the region, across all schemas (Materialize).",
436    true,
437);
438
439pub static MAX_POSTGRES_CONNECTIONS: VarDefinition = VarDefinition::new(
440    "max_postgres_connections",
441    value!(u32; 1000),
442    "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize).",
443    true,
444);
445
446pub static MAX_MYSQL_CONNECTIONS: VarDefinition = VarDefinition::new(
447    "max_mysql_connections",
448    value!(u32; 1000),
449    "The maximum number of MySQL connections in the region, across all schemas (Materialize).",
450    true,
451);
452
453pub static MAX_SQL_SERVER_CONNECTIONS: VarDefinition = VarDefinition::new(
454    "max_sql_server_connections",
455    value!(u32; 1000),
456    "The maximum number of SQL Server connections in the region, across all schemas (Materialize).",
457    true,
458);
459
460pub static MAX_AWS_PRIVATELINK_CONNECTIONS: VarDefinition = VarDefinition::new(
461    "max_aws_privatelink_connections",
462    value!(u32; 0),
463    "The maximum number of AWS PrivateLink connections in the region, across all schemas (Materialize).",
464    true,
465);
466
467pub static MAX_TABLES: VarDefinition = VarDefinition::new(
468    "max_tables",
469    value!(u32; 200),
470    "The maximum number of tables in the region, across all schemas (Materialize).",
471    true,
472);
473
474pub static MAX_SOURCES: VarDefinition = VarDefinition::new(
475    "max_sources",
476    value!(u32; 200),
477    "The maximum number of sources in the region, across all schemas (Materialize).",
478    true,
479);
480
481pub static MAX_SINKS: VarDefinition = VarDefinition::new(
482    "max_sinks",
483    value!(u32; 1000),
484    "The maximum number of sinks in the region, across all schemas (Materialize).",
485    true,
486);
487
488pub static MAX_MATERIALIZED_VIEWS: VarDefinition = VarDefinition::new(
489    "max_materialized_views",
490    value!(u32; 500),
491    "The maximum number of materialized views in the region, across all schemas (Materialize).",
492    true,
493);
494
495pub static MAX_CLUSTERS: VarDefinition = VarDefinition::new(
496    "max_clusters",
497    value!(u32; 25),
498    "The maximum number of clusters in the region (Materialize).",
499    true,
500);
501
502pub static MAX_REPLICAS_PER_CLUSTER: VarDefinition = VarDefinition::new(
503    "max_replicas_per_cluster",
504    value!(u32; 5),
505    "The maximum number of replicas of a single cluster (Materialize).",
506    true,
507);
508
509pub static MAX_CREDIT_CONSUMPTION_RATE: VarDefinition = VarDefinition::new_lazy(
510    "max_credit_consumption_rate",
511    lazy_value!(Numeric; || 1024.into()),
512    "The maximum rate of credit consumption in a region. Credits are consumed based on the size of cluster replicas in use (Materialize).",
513    true,
514)
515.with_constraint(&NUMERIC_NON_NEGATIVE);
516
517pub static MAX_DATABASES: VarDefinition = VarDefinition::new(
518    "max_databases",
519    value!(u32; 1000),
520    "The maximum number of databases in the region (Materialize).",
521    true,
522);
523
524pub static MAX_SCHEMAS_PER_DATABASE: VarDefinition = VarDefinition::new(
525    "max_schemas_per_database",
526    value!(u32; 1000),
527    "The maximum number of schemas in a database (Materialize).",
528    true,
529);
530
531pub static MAX_OBJECTS_PER_SCHEMA: VarDefinition = VarDefinition::new(
532    "max_objects_per_schema",
533    value!(u32; 1000),
534    "The maximum number of objects in a schema (Materialize).",
535    true,
536);
537
538pub static MAX_SECRETS: VarDefinition = VarDefinition::new(
539    "max_secrets",
540    value!(u32; 100),
541    "The maximum number of secrets in the region, across all schemas (Materialize).",
542    true,
543);
544
545pub static MAX_ROLES: VarDefinition = VarDefinition::new(
546    "max_roles",
547    value!(u32; 1000),
548    "The maximum number of roles in the region (Materialize).",
549    true,
550);
551
552pub static MAX_CONTINUAL_TASKS: VarDefinition = VarDefinition::new(
553    "max_continual_tasks",
554    value!(u32; 100),
555    "The maximum number of continual tasks in the region, across all schemas (Materialize).",
556    true,
557);
558
559pub static MAX_NETWORK_POLICIES: VarDefinition = VarDefinition::new(
560    "max_network_policies",
561    value!(u32; 25),
562    "The maximum number of network policies in the region.",
563    true,
564);
565
566pub static MAX_RULES_PER_NETWORK_POLICY: VarDefinition = VarDefinition::new(
567    "max_rules_per_network_policy",
568    value!(u32; 25),
569    "The maximum number of rules per network policies.",
570    true,
571);
572
573// Cloud environmentd is configured with 4 GiB of RAM, so 1 GiB is a good heuristic for a single
574// query.
575//
576// We constrain this parameter to a minimum of 1MB, to avoid accidental usage of values that will
577// interfere with queries executed by the system itself.
578//
579// TODO(jkosh44) Eventually we want to be able to return arbitrary sized results.
580pub static MAX_RESULT_SIZE: VarDefinition = VarDefinition::new(
581    "max_result_size",
582    value!(ByteSize; ByteSize::gb(1)),
583    "The maximum size in bytes for an internal query result (Materialize).",
584    true,
585)
586.with_constraint(&BYTESIZE_AT_LEAST_1MB);
587
588pub static MAX_QUERY_RESULT_SIZE: VarDefinition = VarDefinition::new(
589    "max_query_result_size",
590    value!(ByteSize; ByteSize::gb(1)),
591    "The maximum size in bytes for a single query's result (Materialize).",
592    true,
593);
594
595pub static MAX_COPY_FROM_ROW_SIZE: VarDefinition = VarDefinition::new(
596    "max_copy_from_row_size",
597    value!(ByteSize; ByteSize::mb(128)),
598    "The maximum size in bytes for a single COPY FROM STDIN row (Materialize).",
599    true,
600);
601
602pub static MAX_IDENTIFIER_LENGTH: VarDefinition = VarDefinition::new(
603    "max_identifier_length",
604    value!(usize; mz_sql_lexer::lexer::MAX_IDENTIFIER_LENGTH),
605    "The maximum length of object identifiers in bytes (PostgreSQL).",
606    true,
607);
608
609pub static WELCOME_MESSAGE: VarDefinition = VarDefinition::new(
610    "welcome_message",
611    value!(bool; true),
612    "Whether to send a notice with a welcome message after a successful connection (Materialize).",
613    true,
614);
615
616/// The logical compaction window for builtin tables and sources that have the
617/// `retained_metrics_relation` flag set.
618///
619/// The existence of this variable is a bit of a hack until we have a fully
620/// general solution for controlling retention windows.
621pub static METRICS_RETENTION: VarDefinition = VarDefinition::new(
622    "metrics_retention",
623    // 30 days
624    value!(Duration; Duration::from_secs(30 * 24 * 60 * 60)),
625    "The time to retain cluster utilization metrics (Materialize).",
626    false,
627);
628
629pub static ALLOWED_CLUSTER_REPLICA_SIZES: VarDefinition = VarDefinition::new(
630    "allowed_cluster_replica_sizes",
631    value!(Vec<Ident>; Vec::new()),
632    "The allowed sizes when creating a new cluster replica (Materialize).",
633    true,
634);
635
636pub static PERSIST_FAST_PATH_LIMIT: VarDefinition = VarDefinition::new(
637    "persist_fast_path_limit",
638    value!(usize; 25),
639    "An exclusive upper bound on the number of results we may return from a Persist fast-path peek; \
640    queries that may return more results will follow the normal / slow path. \
641    Setting this to 0 disables the feature.",
642    false,
643);
644
645/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_size`.
646pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE: VarDefinition = VarDefinition::new(
647    "pg_timestamp_oracle_connection_pool_max_size",
648    value!(usize; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE),
649    "Maximum size of the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
650    false,
651);
652
653/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_max_wait`.
654pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT: VarDefinition = VarDefinition::new(
655    "pg_timestamp_oracle_connection_pool_max_wait",
656    value!(Option<Duration>; Some(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT)),
657    "The maximum time to wait when attempting to obtain a connection from the Postgres/CRDB connection pool, used by the Postgres/CRDB timestamp oracle.",
658    false,
659);
660
661/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl`.
662pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL: VarDefinition = VarDefinition::new(
663    "pg_timestamp_oracle_connection_pool_ttl",
664    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
665    "The minimum TTL of a Consensus connection to Postgres/CRDB before it is proactively terminated",
666    false,
667);
668
669/// Controls `mz_adapter::coord::timestamp_oracle::postgres_oracle::DynamicConfig::pg_connection_pool_ttl_stagger`.
670pub static PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER: VarDefinition = VarDefinition::new(
671    "pg_timestamp_oracle_connection_pool_ttl_stagger",
672    value!(Duration; DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER),
673    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
674    false,
675);
676
677pub static UNSAFE_NEW_TRANSACTION_WALL_TIME: VarDefinition = VarDefinition::new(
678    "unsafe_new_transaction_wall_time",
679    value!(Option<CheckedTimestamp<DateTime<Utc>>>; None),
680    "Sets the wall time for all new explicit or implicit transactions to control the value of `now()`. \
681    If not set, uses the system's clock.",
682    // This needs to be true because `user_visible: false` things are only modifiable by the mz_system
683    // and mz_support users, and we want sqllogictest to have access with its user. Because the name
684    // starts with "unsafe" it still won't be visible or changeable by users unless unsafe mode is
685    // enabled.
686    true,
687);
688
689pub static SCRAM_ITERATIONS: VarDefinition = VarDefinition::new(
690    "scram_iterations",
691    // / The default iteration count as suggested by
692    // / <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
693    value!(NonZeroU32; NonZeroU32::new(600_000).unwrap()),
694    "Iterations to use when hashing passwords. Higher iterations are more secure, but take longer to validated. \
695    Please consider the security risks before reducing this below the default value.",
696    true,
697);
698
699/// Tuning for RocksDB used by `UPSERT` sources that takes effect on restart.
700pub mod upsert_rocksdb {
701    use super::*;
702    use mz_rocksdb_types::config::{CompactionStyle, CompressionType};
703
704    pub static UPSERT_ROCKSDB_COMPACTION_STYLE: VarDefinition = VarDefinition::new(
705        "upsert_rocksdb_compaction_style",
706        value!(CompactionStyle; mz_rocksdb_types::defaults::DEFAULT_COMPACTION_STYLE),
707        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
708        sources. Described in the `mz_rocksdb_types::config` module. \
709        Only takes effect on source restart (Materialize).",
710        false,
711    );
712
713    pub static UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET: VarDefinition =
714        VarDefinition::new(
715            "upsert_rocksdb_optimize_compaction_memtable_budget",
716            value!(usize; mz_rocksdb_types::defaults::DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET),
717            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
718        sources. Described in the `mz_rocksdb_types::config` module. \
719        Only takes effect on source restart (Materialize).",
720            false,
721        );
722
723    pub static UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES: VarDefinition =
724        VarDefinition::new(
725            "upsert_rocksdb_level_compaction_dynamic_level_bytes",
726            value!(bool; mz_rocksdb_types::defaults::DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES),
727            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
728        sources. Described in the `mz_rocksdb_types::config` module. \
729        Only takes effect on source restart (Materialize).",
730            false,
731        );
732
733    pub static UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO: VarDefinition = VarDefinition::new(
734        "upsert_rocksdb_universal_compaction_ratio",
735        value!(i32; mz_rocksdb_types::defaults::DEFAULT_UNIVERSAL_COMPACTION_RATIO),
736        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
737        sources. Described in the `mz_rocksdb_types::config` module. \
738        Only takes effect on source restart (Materialize).",
739        false,
740    );
741
742    pub static UPSERT_ROCKSDB_PARALLELISM: VarDefinition = VarDefinition::new(
743        "upsert_rocksdb_parallelism",
744        value!(Option<i32>; mz_rocksdb_types::defaults::DEFAULT_PARALLELISM),
745        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
746        sources. Described in the `mz_rocksdb_types::config` module. \
747        Only takes effect on source restart (Materialize).",
748        false,
749    );
750
751    pub static UPSERT_ROCKSDB_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
752        "upsert_rocksdb_compression_type",
753        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_COMPRESSION_TYPE),
754        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
755        sources. Described in the `mz_rocksdb_types::config` module. \
756        Only takes effect on source restart (Materialize).",
757        false,
758    );
759
760    pub static UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE: VarDefinition = VarDefinition::new(
761        "upsert_rocksdb_bottommost_compression_type",
762        value!(CompressionType; mz_rocksdb_types::defaults::DEFAULT_BOTTOMMOST_COMPRESSION_TYPE),
763        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
764        sources. Described in the `mz_rocksdb_types::config` module. \
765        Only takes effect on source restart (Materialize).",
766        false,
767    );
768
769    pub static UPSERT_ROCKSDB_BATCH_SIZE: VarDefinition = VarDefinition::new(
770        "upsert_rocksdb_batch_size",
771        value!(usize; mz_rocksdb_types::defaults::DEFAULT_BATCH_SIZE),
772        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
773        sources. Described in the `mz_rocksdb_types::config` module. \
774        Can be changed dynamically (Materialize).",
775        false,
776    );
777
778    pub static UPSERT_ROCKSDB_RETRY_DURATION: VarDefinition = VarDefinition::new(
779        "upsert_rocksdb_retry_duration",
780        value!(Duration; mz_rocksdb_types::defaults::DEFAULT_RETRY_DURATION),
781        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
782        sources. Described in the `mz_rocksdb_types::config` module. \
783        Only takes effect on source restart (Materialize).",
784        false,
785    );
786
787    pub static UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
788        "upsert_rocksdb_stats_log_interval_seconds",
789        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_LOG_INTERVAL_S),
790        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
791        sources. Described in the `mz_rocksdb_types::config` module. \
792        Only takes effect on source restart (Materialize).",
793        false,
794    );
795
796    pub static UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS: VarDefinition = VarDefinition::new(
797        "upsert_rocksdb_stats_persist_interval_seconds",
798        value!(u32; mz_rocksdb_types::defaults::DEFAULT_STATS_PERSIST_INTERVAL_S),
799        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
800        sources. Described in the `mz_rocksdb_types::config` module. \
801        Only takes effect on source restart (Materialize).",
802        false,
803    );
804
805    pub static UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB: VarDefinition = VarDefinition::new(
806        "upsert_rocksdb_point_lookup_block_cache_size_mb",
807        value!(Option<u32>; None),
808        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
809        sources. Described in the `mz_rocksdb_types::config` module. \
810        Only takes effect on source restart (Materialize).",
811        false,
812    );
813
814    /// The number of times by which allocated buffers will be shrinked in upsert rocksdb.
815    /// If value is 0, then no shrinking will occur.
816    pub static UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
817        "upsert_rocksdb_shrink_allocated_buffers_by_ratio",
818        value!(usize; mz_rocksdb_types::defaults::DEFAULT_SHRINK_BUFFERS_BY_RATIO),
819        "The number of times by which allocated buffers will be shrinked in upsert rocksdb.",
820        false,
821    );
822
823    /// Only used if `upsert_rocksdb_write_buffer_manager_memory_bytes` is also set
824    /// and write buffer manager is enabled
825    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION: VarDefinition =
826        VarDefinition::new(
827            "upsert_rocksdb_write_buffer_manager_cluster_memory_fraction",
828            value!(Option<Numeric>; None),
829            "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
830        sources. Described in the `mz_rocksdb_types::config` module. \
831        Only takes effect on source restart (Materialize).",
832            false,
833        );
834
835    /// `upsert_rocksdb_write_buffer_manager_memory_bytes` needs to be set for write buffer manager to be
836    /// used.
837    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES: VarDefinition = VarDefinition::new(
838        "upsert_rocksdb_write_buffer_manager_memory_bytes",
839        value!(Option<usize>; None),
840        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
841        sources. Described in the `mz_rocksdb_types::config` module. \
842        Only takes effect on source restart (Materialize).",
843        false,
844    );
845
846    pub static UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL: VarDefinition = VarDefinition::new(
847        "upsert_rocksdb_write_buffer_manager_allow_stall",
848        value!(bool; false),
849        "Tuning parameter for RocksDB as used in `UPSERT/DEBEZIUM` \
850        sources. Described in the `mz_rocksdb_types::config` module. \
851        Only takes effect on source restart (Materialize).",
852        false,
853    );
854}
855
856pub static LOGGING_FILTER: VarDefinition = VarDefinition::new_lazy(
857    "log_filter",
858    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
859    "Sets the filter to apply to stderr logging.",
860    false,
861);
862
863pub static OPENTELEMETRY_FILTER: VarDefinition = VarDefinition::new_lazy(
864    "opentelemetry_filter",
865    lazy_value!(CloneableEnvFilter; || CloneableEnvFilter::from_str("info").expect("valid EnvFilter")),
866    "Sets the filter to apply to OpenTelemetry-backed distributed tracing.",
867    false,
868);
869
870pub static LOGGING_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
871    "log_filter_defaults",
872    lazy_value!(Vec<SerializableDirective>; || {
873        mz_ore::tracing::LOGGING_DEFAULTS
874            .iter()
875            .map(|d| d.clone().into())
876            .collect()
877    }),
878    "Sets additional default directives to apply to stderr logging. \
879        These apply to all variations of `log_filter`. Directives other than \
880        `module=off` are likely incorrect.",
881    false,
882);
883
884pub static OPENTELEMETRY_FILTER_DEFAULTS: VarDefinition = VarDefinition::new_lazy(
885    "opentelemetry_filter_defaults",
886    lazy_value!(Vec<SerializableDirective>; || {
887        mz_ore::tracing::OPENTELEMETRY_DEFAULTS
888            .iter()
889            .map(|d| d.clone().into())
890            .collect()
891    }),
892    "Sets additional default directives to apply to OpenTelemetry-backed \
893        distributed tracing. \
894        These apply to all variations of `opentelemetry_filter`. Directives other than \
895        `module=off` are likely incorrect.",
896    false,
897);
898
899pub static SENTRY_FILTERS: VarDefinition = VarDefinition::new_lazy(
900    "sentry_filters",
901    lazy_value!(Vec<SerializableDirective>; || {
902        mz_ore::tracing::SENTRY_DEFAULTS
903            .iter()
904            .map(|d| d.clone().into())
905            .collect()
906    }),
907    "Sets additional default directives to apply to sentry logging. \
908        These apply on top of a default `info` directive. Directives other than \
909        `module=off` are likely incorrect.",
910    false,
911);
912
913pub static WEBHOOKS_SECRETS_CACHING_TTL_SECS: VarDefinition = VarDefinition::new_lazy(
914    "webhooks_secrets_caching_ttl_secs",
915    lazy_value!(usize; || {
916        usize::cast_from(mz_secrets::cache::DEFAULT_TTL_SECS)
917    }),
918    "Sets the time-to-live for values in the Webhooks secrets cache.",
919    false,
920);
921
922pub static COORD_SLOW_MESSAGE_WARN_THRESHOLD: VarDefinition = VarDefinition::new(
923    "coord_slow_message_warn_threshold",
924    value!(Duration; Duration::from_secs(30)),
925    "Sets the threshold at which we will error! for a coordinator message being slow.",
926    false,
927);
928
929/// Controls the connect_timeout setting when connecting to PG via `mz_postgres_util`.
930pub static PG_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
931    "pg_source_connect_timeout",
932    value!(Duration; DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
933    "Sets the timeout applied to socket-level connection attempts for PG \
934    replication connections (Materialize).",
935    false,
936);
937
938/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection
939/// when connecting to PG via `mz_postgres_util`.
940pub static PG_SOURCE_TCP_KEEPALIVES_RETRIES: VarDefinition = VarDefinition::new(
941    "pg_source_tcp_keepalives_retries",
942    value!(u32; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
943    "Sets the maximum number of TCP keepalive probes that will be sent before dropping \
944    a connection when connecting to PG via `mz_postgres_util` (Materialize).",
945    false,
946);
947
948/// Sets the amount of idle time before a keepalive packet is sent on the connection when connecting
949/// to PG via `mz_postgres_util`.
950pub static PG_SOURCE_TCP_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
951    "pg_source_tcp_keepalives_idle",
952    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
953    "Sets the amount of idle time before a keepalive packet is sent on the connection \
954        when connecting to PG via `mz_postgres_util` (Materialize).",
955    false,
956);
957
958/// Sets the time interval between TCP keepalive probes when connecting to PG via `mz_postgres_util`.
959pub static PG_SOURCE_TCP_KEEPALIVES_INTERVAL: VarDefinition = VarDefinition::new(
960    "pg_source_tcp_keepalives_interval",
961    value!(Duration; DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
962    "Sets the time interval between TCP keepalive probes when connecting to PG via \
963        replication (Materialize).",
964    false,
965);
966
967/// Sets the TCP user timeout when connecting to PG via `mz_postgres_util`.
968pub static PG_SOURCE_TCP_USER_TIMEOUT: VarDefinition = VarDefinition::new(
969    "pg_source_tcp_user_timeout",
970    value!(Duration; DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
971    "Sets the TCP user timeout when connecting to PG via `mz_postgres_util` (Materialize).",
972    false,
973);
974
975/// Sets whether to apply the TCP configuration parameters on the server when
976/// connecting to PG via `mz_postgres_util`.
977pub static PG_SOURCE_TCP_CONFIGURE_SERVER: VarDefinition = VarDefinition::new(
978    "pg_source_tcp_configure_server",
979    value!(bool; DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER),
980    "Sets whether to apply the TCP configuration parameters on the server when connecting to PG via `mz_postgres_util` (Materialize).",
981    false,
982);
983
984/// Sets the `statement_timeout` value to use during the snapshotting phase of
985/// PG sources.
986pub static PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT: VarDefinition = VarDefinition::new(
987    "pg_source_snapshot_statement_timeout",
988    value!(Duration; mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT),
989    "Sets the `statement_timeout` value to use during the snapshotting phase of PG sources (Materialize)",
990    false,
991);
992
993/// Sets the `wal_sender_timeout` value to use during the replication phase of
994/// PG sources.
995pub static PG_SOURCE_WAL_SENDER_TIMEOUT: VarDefinition = VarDefinition::new(
996    "pg_source_wal_sender_timeout",
997    value!(Option<Duration>; DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT),
998    "Sets the `wal_sender_timeout` value to use during the replication phase of PG sources (Materialize)",
999    false,
1000);
1001
1002/// Please see `PgSourceSnapshotConfig`.
1003pub static PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT: VarDefinition = VarDefinition::new(
1004    "pg_source_snapshot_collect_strict_count",
1005    value!(bool; mz_storage_types::parameters::PgSourceSnapshotConfig::new().collect_strict_count),
1006    "Please see <https://dev.materialize.com/api/rust-private\
1007        /mz_storage_types/parameters\
1008        /struct.PgSourceSnapshotConfig.html#structfield.collect_strict_count>",
1009    false,
1010);
1011
1012/// Sets the time between TCP keepalive probes when connecting to MySQL via `mz_mysql_util`.
1013pub static MYSQL_SOURCE_TCP_KEEPALIVE: VarDefinition = VarDefinition::new(
1014    "mysql_source_tcp_keepalive",
1015    value!(Duration; mz_mysql_util::DEFAULT_TCP_KEEPALIVE),
1016    "Sets the time between TCP keepalive probes when connecting to MySQL",
1017    false,
1018);
1019
1020/// Sets the `max_execution_time` value to use during the snapshotting phase of
1021/// MySQL sources.
1022pub static MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME: VarDefinition = VarDefinition::new(
1023    "mysql_source_snapshot_max_execution_time",
1024    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_MAX_EXECUTION_TIME),
1025    "Sets the `max_execution_time` value to use during the snapshotting phase of MySQL sources (Materialize)",
1026    false,
1027);
1028
1029/// Sets the `lock_wait_timeout` value to use during the snapshotting phase of
1030/// MySQL sources.
1031pub static MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT: VarDefinition = VarDefinition::new(
1032    "mysql_source_snapshot_lock_wait_timeout",
1033    value!(Duration; mz_mysql_util::DEFAULT_SNAPSHOT_LOCK_WAIT_TIMEOUT),
1034    "Sets the `lock_wait_timeout` value to use during the snapshotting phase of MySQL sources (Materialize)",
1035    false,
1036);
1037
1038/// Sets the timeout for establishing an authenticated connection to MySQL
1039pub static MYSQL_SOURCE_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1040    "mysql_source_connect_timeout",
1041    value!(Duration; mz_mysql_util::DEFAULT_CONNECT_TIMEOUT),
1042    "Sets the timeout for establishing an authenticated connection to MySQL",
1043    false,
1044);
1045
1046/// Controls the check interval for connections to SSH bastions via `mz_ssh_util`.
1047pub static SSH_CHECK_INTERVAL: VarDefinition = VarDefinition::new(
1048    "ssh_check_interval",
1049    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CHECK_INTERVAL),
1050    "Controls the check interval for connections to SSH bastions via `mz_ssh_util`.",
1051    false,
1052);
1053
1054/// Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.
1055pub static SSH_CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1056    "ssh_connect_timeout",
1057    value!(Duration; mz_ssh_util::tunnel::DEFAULT_CONNECT_TIMEOUT),
1058    "Controls the connect timeout for connections to SSH bastions via `mz_ssh_util`.",
1059    false,
1060);
1061
1062/// Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.
1063pub static SSH_KEEPALIVES_IDLE: VarDefinition = VarDefinition::new(
1064    "ssh_keepalives_idle",
1065    value!(Duration; mz_ssh_util::tunnel::DEFAULT_KEEPALIVES_IDLE),
1066    "Controls the keepalive idle interval for connections to SSH bastions via `mz_ssh_util`.",
1067    false,
1068);
1069
1070/// Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.
1071pub static KAFKA_SOCKET_KEEPALIVE: VarDefinition = VarDefinition::new(
1072    "kafka_socket_keepalive",
1073    value!(bool; mz_kafka_util::client::DEFAULT_KEEPALIVE),
1074    "Enables `socket.keepalive.enable` for rdkafka client connections. Defaults to true.",
1075    false,
1076);
1077
1078/// Controls `socket.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1079/// (60000ms). Cannot be greater than 300000ms, more than 100ms greater than
1080/// `kafka_transaction_timeout`, or less than 10ms.
1081pub static KAFKA_SOCKET_TIMEOUT: VarDefinition = VarDefinition::new(
1082    "kafka_socket_timeout",
1083    value!(Option<Duration>; None),
1084    "Controls `socket.timeout.ms` for rdkafka \
1085        client connections. Defaults to the rdkafka default (60000ms) or \
1086        the set transaction timeout + 100ms, whichever one is smaller. \
1087        Cannot be greater than 300000ms, more than 100ms greater than \
1088        `kafka_transaction_timeout`, or less than 10ms.",
1089    false,
1090);
1091
1092/// Controls `transaction.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1093/// (60000ms). Cannot be greater than `i32::MAX` or less than 1000ms.
1094pub static KAFKA_TRANSACTION_TIMEOUT: VarDefinition = VarDefinition::new(
1095    "kafka_transaction_timeout",
1096    value!(Duration; mz_kafka_util::client::DEFAULT_TRANSACTION_TIMEOUT),
1097    "Controls `transaction.timeout.ms` for rdkafka \
1098        client connections. Defaults to the 10min. \
1099        Cannot be greater than `i32::MAX` or less than 1000ms.",
1100    false,
1101);
1102
1103/// Controls `socket.connection.setup.timeout.ms` for rdkafka client connections. Defaults to the rdkafka default
1104/// (30000ms). Cannot be greater than `i32::MAX` or less than 1000ms
1105pub static KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT: VarDefinition = VarDefinition::new(
1106    "kafka_socket_connection_setup_timeout",
1107    value!(Duration; mz_kafka_util::client::DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT),
1108    "Controls `socket.connection.setup.timeout.ms` for rdkafka \
1109        client connections. Defaults to the rdkafka default (30000ms). \
1110        Cannot be greater than `i32::MAX` or less than 1000ms",
1111    false,
1112);
1113
1114/// Controls the timeout when fetching kafka metadata. Defaults to 10s.
1115pub static KAFKA_FETCH_METADATA_TIMEOUT: VarDefinition = VarDefinition::new(
1116    "kafka_fetch_metadata_timeout",
1117    value!(Duration; mz_kafka_util::client::DEFAULT_FETCH_METADATA_TIMEOUT),
1118    "Controls the timeout when fetching kafka metadata. \
1119        Defaults to 10s.",
1120    false,
1121);
1122
1123/// Controls the timeout when fetching kafka progress records. Defaults to 60s.
1124pub static KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT: VarDefinition = VarDefinition::new(
1125    "kafka_progress_record_fetch_timeout",
1126    value!(Option<Duration>; None),
1127    "Controls the timeout when fetching kafka progress records. \
1128        Defaults to 60s or the transaction timeout, whichever one is larger.",
1129    false,
1130);
1131
1132/// The maximum number of in-flight bytes emitted by persist_sources feeding _storage
1133/// dataflows_.
1134/// Currently defaults to 256MiB = 268435456 bytes
1135/// Note: Backpressure will only be turned on if disk is enabled based on
1136/// `storage_dataflow_max_inflight_bytes_disk_only` flag
1137pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES: VarDefinition = VarDefinition::new(
1138    "storage_dataflow_max_inflight_bytes",
1139    value!(Option<usize>; Some(256 * 1024 * 1024)),
1140    "The maximum number of in-flight bytes emitted by persist_sources feeding \
1141        storage dataflows. Defaults to backpressure enabled (Materialize).",
1142    false,
1143);
1144
1145/// Configuration ratio to shrink unusef buffers in upsert by.
1146/// For eg: is 2 is set, then the buffers will be reduced by 2 i.e. halved.
1147/// Default is 0, which means shrinking is disabled.
1148pub static STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO: VarDefinition = VarDefinition::new(
1149    "storage_shrink_upsert_unused_buffers_by_ratio",
1150    value!(usize; 0),
1151    "Configuration ratio to shrink unusef buffers in upsert by",
1152    false,
1153);
1154
1155/// The fraction of the cluster replica size to be used as the maximum number of
1156/// in-flight bytes emitted by persist_sources feeding storage dataflows.
1157/// If not configured, the storage_dataflow_max_inflight_bytes value will be used.
1158/// For this value to be used storage_dataflow_max_inflight_bytes needs to be set.
1159pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION: VarDefinition =
1160    VarDefinition::new_lazy(
1161        "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction",
1162        lazy_value!(Option<Numeric>; || Some(0.01.into())),
1163        "The fraction of the cluster replica size to be used as the maximum number of \
1164            in-flight bytes emitted by persist_sources feeding storage dataflows. \
1165            If not configured, the storage_dataflow_max_inflight_bytes value will be used.",
1166        false,
1167    );
1168
1169pub static STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY: VarDefinition = VarDefinition::new(
1170    "storage_dataflow_max_inflight_bytes_disk_only",
1171    value!(bool; true),
1172    "Whether or not `storage_dataflow_max_inflight_bytes` applies only to \
1173        upsert dataflows using disks. Defaults to true (Materialize).",
1174    false,
1175);
1176
1177/// The interval to submit statistics to `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker`.
1178pub static STORAGE_STATISTICS_INTERVAL: VarDefinition = VarDefinition::new(
1179    "storage_statistics_interval",
1180    value!(Duration; mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT),
1181    "The interval to submit statistics to `mz_source_statistics_per_worker` \
1182        and `mz_sink_statistics` (Materialize).",
1183    false,
1184);
1185
1186/// The interval to collect statistics for `mz_source_statistics_per_worker` and `mz_sink_statistics_per_worker` in
1187/// clusterd. Controls the accuracy of metrics.
1188pub static STORAGE_STATISTICS_COLLECTION_INTERVAL: VarDefinition = VarDefinition::new(
1189    "storage_statistics_collection_interval",
1190    value!(Duration; mz_storage_types::parameters::STATISTICS_COLLECTION_INTERVAL_DEFAULT),
1191    "The interval to collect statistics for `mz_source_statistics_per_worker` \
1192        and `mz_sink_statistics_per_worker` in clusterd. Controls the accuracy of metrics \
1193        (Materialize).",
1194    false,
1195);
1196
1197pub static STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: VarDefinition = VarDefinition::new(
1198    "storage_record_source_sink_namespaced_errors",
1199    value!(bool; true),
1200    "Whether or not to record namespaced errors in the status history tables",
1201    false,
1202);
1203
1204/// Boolean flag indicating whether to enable syncing from
1205/// LaunchDarkly. Can be turned off as an emergency measure to still
1206/// be able to alter parameters while LD is broken.
1207pub static ENABLE_LAUNCHDARKLY: VarDefinition = VarDefinition::new(
1208    "enable_launchdarkly",
1209    value!(bool; true),
1210    "Boolean flag indicating whether flag synchronization from LaunchDarkly should be enabled (Materialize).",
1211    false,
1212);
1213
1214/// Feature flag indicating whether real time recency is enabled. Not that
1215/// unlike other feature flags, this is made available at the session level, so
1216/// is additionally gated by a feature flag.
1217pub static REAL_TIME_RECENCY: VarDefinition = VarDefinition::new(
1218    "real_time_recency",
1219    value!(bool; false),
1220    "Feature flag indicating whether real time recency is enabled (Materialize).",
1221    true,
1222)
1223.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1224
1225pub static REAL_TIME_RECENCY_TIMEOUT: VarDefinition = VarDefinition::new(
1226    "real_time_recency_timeout",
1227    value!(Duration; Duration::from_secs(10)),
1228    "Sets the maximum allowed duration of SELECTs that actively use real-time \
1229    recency, i.e. reach out to an external system to determine their most recencly exposed \
1230    data (Materialize).",
1231    true,
1232)
1233.with_feature_flag(&ALLOW_REAL_TIME_RECENCY);
1234
1235pub static EMIT_PLAN_INSIGHTS_NOTICE: VarDefinition = VarDefinition::new(
1236    "emit_plan_insights_notice",
1237    value!(bool; false),
1238    "Boolean flag indicating whether to send a NOTICE with JSON-formatted plan insights before executing a SELECT statement (Materialize).",
1239    true,
1240);
1241
1242pub static EMIT_TIMESTAMP_NOTICE: VarDefinition = VarDefinition::new(
1243    "emit_timestamp_notice",
1244    value!(bool; false),
1245    "Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize).",
1246    true,
1247);
1248
1249pub static EMIT_TRACE_ID_NOTICE: VarDefinition = VarDefinition::new(
1250    "emit_trace_id_notice",
1251    value!(bool; false),
1252    "Boolean flag indicating whether to send a NOTICE specifying the trace id when available (Materialize).",
1253    true,
1254);
1255
1256pub static UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP: VarDefinition = VarDefinition::new(
1257    "unsafe_mock_audit_event_timestamp",
1258    value!(Option<mz_repr::Timestamp>; None),
1259    "Mocked timestamp to use for audit events for testing purposes",
1260    false,
1261);
1262
1263pub static ENABLE_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1264    "enable_rbac_checks",
1265    value!(bool; true),
1266    "User facing global boolean flag indicating whether to apply RBAC checks before \
1267        executing statements (Materialize).",
1268    true,
1269);
1270
1271pub static ENABLE_SESSION_RBAC_CHECKS: VarDefinition = VarDefinition::new(
1272    "enable_session_rbac_checks",
1273    // TODO(jkosh44) Once RBAC is enabled in all environments, change this to `true`.
1274    value!(bool; false),
1275    "User facing session boolean flag indicating whether to apply RBAC checks before \
1276        executing statements (Materialize).",
1277    true,
1278);
1279
1280pub static EMIT_INTROSPECTION_QUERY_NOTICE: VarDefinition = VarDefinition::new(
1281    "emit_introspection_query_notice",
1282    value!(bool; true),
1283    "Whether to print a notice when querying per-replica introspection sources.",
1284    true,
1285);
1286
1287// TODO(mgree) change this to a SelectOption
1288pub static ENABLE_SESSION_CARDINALITY_ESTIMATES: VarDefinition = VarDefinition::new(
1289    "enable_session_cardinality_estimates",
1290    value!(bool; false),
1291    "Feature flag indicating whether to use cardinality estimates when optimizing queries; \
1292        does not affect EXPLAIN WITH(cardinality) (Materialize).",
1293    true,
1294)
1295.with_feature_flag(&ENABLE_CARDINALITY_ESTIMATES);
1296
1297pub static OPTIMIZER_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1298    "optimizer_stats_timeout",
1299    value!(Duration; Duration::from_millis(250)),
1300    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1301        applied to non-oneshot, i.e., long-lasting queries, like CREATE MATERIALIZED VIEW (Materialize).",
1302    false,
1303);
1304
1305pub static OPTIMIZER_ONESHOT_STATS_TIMEOUT: VarDefinition = VarDefinition::new(
1306    "optimizer_oneshot_stats_timeout",
1307    value!(Duration; Duration::from_millis(10)),
1308    "Sets the timeout applied to the optimizer's statistics collection from storage; \
1309        applied to oneshot queries, like SELECT (Materialize).",
1310    false,
1311);
1312
1313pub static PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE: VarDefinition = VarDefinition::new(
1314    "privatelink_status_update_quota_per_minute",
1315    value!(u32; 20),
1316    "Sets the per-minute quota for privatelink vpc status updates to be written to \
1317        the storage-collection-backed system table. This value implies the total and burst quota per-minute.",
1318    false,
1319);
1320
1321pub static STATEMENT_LOGGING_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1322    "statement_logging_sample_rate",
1323    lazy_value!(Numeric; || 0.1.into()),
1324    "User-facing session variable indicating how many statement executions should be \
1325        logged, subject to constraint by the system variable `statement_logging_max_sample_rate` (Materialize).",
1326    true,
1327).with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1328
1329pub static ENABLE_DEFAULT_CONNECTION_VALIDATION: VarDefinition = VarDefinition::new(
1330    "enable_default_connection_validation",
1331    value!(bool; true),
1332    "LD facing global boolean flag that allows turning default connection validation off for everyone (Materialize).",
1333    false,
1334);
1335
1336pub static STATEMENT_LOGGING_MAX_DATA_CREDIT: VarDefinition = VarDefinition::new(
1337    "statement_logging_max_data_credit",
1338    value!(Option<usize>; Some(50 * 1024 * 1024)),
1339    // The idea is that during periods of low logging, tokens can accumulate up to this value,
1340    // and then be depleted during periods of high logging.
1341    "The maximum number of bytes that can be logged for statement logging in short burts, or NULL if unlimited (Materialize).",
1342    false,
1343);
1344
1345pub static STATEMENT_LOGGING_TARGET_DATA_RATE: VarDefinition = VarDefinition::new(
1346    "statement_logging_target_data_rate",
1347    value!(Option<usize>; Some(2071)),
1348    "The maximum sustained data rate of statement logging, in bytes per second, or NULL if unlimited (Materialize).",
1349    false,
1350);
1351
1352pub static STATEMENT_LOGGING_MAX_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1353    "statement_logging_max_sample_rate",
1354    lazy_value!(Numeric; || 0.99.into()),
1355    "The maximum rate at which statements may be logged. If this value is less than \
1356        that of `statement_logging_sample_rate`, the latter is ignored (Materialize).",
1357    true,
1358)
1359.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1360
1361pub static STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE: VarDefinition = VarDefinition::new_lazy(
1362    "statement_logging_default_sample_rate",
1363    lazy_value!(Numeric; || 0.99.into()),
1364    "The default value of `statement_logging_sample_rate` for new sessions (Materialize).",
1365    true,
1366)
1367.with_constraint(&NUMERIC_BOUNDED_0_1_INCLUSIVE);
1368
1369pub static ENABLE_INTERNAL_STATEMENT_LOGGING: VarDefinition = VarDefinition::new(
1370    "enable_internal_statement_logging",
1371    value!(bool; false),
1372    "Whether to log statements from the `mz_system` user.",
1373    false,
1374);
1375
1376pub static AUTO_ROUTE_CATALOG_QUERIES: VarDefinition = VarDefinition::new(
1377    "auto_route_catalog_queries",
1378    value!(bool; true),
1379    "Whether to force queries that depend only on system tables, to run on the mz_catalog_server cluster (Materialize).",
1380    true,
1381);
1382
1383pub static MAX_CONNECTIONS: VarDefinition = VarDefinition::new(
1384    "max_connections",
1385    value!(u32; 5000),
1386    "The maximum number of concurrent connections (PostgreSQL).",
1387    true,
1388);
1389
1390pub static SUPERUSER_RESERVED_CONNECTIONS: VarDefinition = VarDefinition::new(
1391    "superuser_reserved_connections",
1392    value!(u32; 3),
1393    "The number of connections that are reserved for superusers (PostgreSQL).",
1394    true,
1395);
1396
1397/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_source_status_history_entries`].
1398pub static KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1399    "keep_n_source_status_history_entries",
1400    value!(usize; 5),
1401    "On reboot, truncate all but the last n entries per ID in the source_status_history collection (Materialize).",
1402    false,
1403);
1404
1405/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_sink_status_history_entries`].
1406pub static KEEP_N_SINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1407    "keep_n_sink_status_history_entries",
1408    value!(usize; 5),
1409    "On reboot, truncate all but the last n entries per ID in the sink_status_history collection (Materialize).",
1410    false,
1411);
1412
1413/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_privatelink_status_history_entries`].
1414pub static KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new(
1415    "keep_n_privatelink_status_history_entries",
1416    value!(usize; 5),
1417    "On reboot, truncate all but the last n entries per ID in the mz_aws_privatelink_connection_status_history \
1418        collection (Materialize).",
1419    false,
1420);
1421
1422/// Controls [`mz_storage_types::parameters::StorageParameters::replica_status_history_retention_window`].
1423pub static REPLICA_STATUS_HISTORY_RETENTION_WINDOW: VarDefinition = VarDefinition::new(
1424    "replica_status_history_retention_window",
1425    value!(Duration; REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT),
1426    "On reboot, truncate up all entries past the retention window in the mz_cluster_replica_status_history \
1427        collection (Materialize).",
1428    false,
1429);
1430
1431pub static ENABLE_STORAGE_SHARD_FINALIZATION: VarDefinition = VarDefinition::new(
1432    "enable_storage_shard_finalization",
1433    value!(bool; true),
1434    "Whether to allow the storage client to finalize shards (Materialize).",
1435    false,
1436);
1437
1438pub static ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE: VarDefinition = VarDefinition::new(
1439    "enable_consolidate_after_union_negate",
1440    value!(bool; true),
1441    "consolidation after Unions that have a Negated input (Materialize).",
1442    true,
1443);
1444
1445pub static DEFAULT_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1446    "default_timestamp_interval",
1447    value!(Duration; Duration::from_millis(1000)),
1448    "The interval at which timestamps are assigned to data from sources and tables.",
1449    false,
1450);
1451
1452pub static MIN_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1453    "min_timestamp_interval",
1454    value!(Duration; Duration::from_millis(1000)),
1455    "Minimum timestamp interval",
1456    false,
1457);
1458
1459pub static MAX_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
1460    "max_timestamp_interval",
1461    value!(Duration; Duration::from_millis(1000)),
1462    "Maximum timestamp interval",
1463    false,
1464);
1465
1466pub static WEBHOOK_CONCURRENT_REQUEST_LIMIT: VarDefinition = VarDefinition::new(
1467    "webhook_concurrent_request_limit",
1468    value!(usize; WEBHOOK_CONCURRENCY_LIMIT),
1469    "Maximum number of concurrent requests for appending to a webhook source.",
1470    false,
1471);
1472
1473pub static USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION: VarDefinition = VarDefinition::new(
1474    "user_storage_managed_collections_batch_duration",
1475    value!(Duration; STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT),
1476    "Duration which we'll wait to collect a batch of events for a webhook source.",
1477    false,
1478);
1479
1480// This system var will need to point to the name of an existing network policy
1481// this will be enforced on alter_system_set
1482pub static NETWORK_POLICY: VarDefinition = VarDefinition::new_lazy(
1483    "network_policy",
1484    lazy_value!(String; || "default".to_string()),
1485    "Sets the fallback network policy applied to all users without an explicit policy.",
1486    true,
1487);
1488
1489pub static FORCE_SOURCE_TABLE_SYNTAX: VarDefinition = VarDefinition::new(
1490    "force_source_table_syntax",
1491    value!(bool; false),
1492    "Force use of new source model (CREATE TABLE .. FROM SOURCE) and migrate existing sources",
1493    true,
1494);
1495
1496pub static OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD: VarDefinition = VarDefinition::new(
1497    "optimizer_e2e_latency_warning_threshold",
1498    value!(Duration; Duration::from_millis(500)),
1499    "Sets the duration that a query can take to compile; queries that take longer \
1500        will trigger a warning. If this value is specified without units, it is taken as \
1501        milliseconds. A value of zero disables the timeout (Materialize).",
1502    true,
1503);
1504
1505/// Configuration for gRPC client connections.
1506pub mod grpc_client {
1507    use super::*;
1508
1509    pub static CONNECT_TIMEOUT: VarDefinition = VarDefinition::new(
1510        "grpc_client_connect_timeout",
1511        value!(Duration; Duration::from_secs(5)),
1512        "Timeout to apply to initial gRPC client connection establishment.",
1513        false,
1514    );
1515
1516    pub static HTTP2_KEEP_ALIVE_INTERVAL: VarDefinition = VarDefinition::new(
1517        "grpc_client_http2_keep_alive_interval",
1518        value!(Duration; Duration::from_secs(3)),
1519        "Idle time to wait before sending HTTP/2 PINGs to maintain established gRPC client connections.",
1520        false,
1521    );
1522
1523    pub static HTTP2_KEEP_ALIVE_TIMEOUT: VarDefinition = VarDefinition::new(
1524        "grpc_client_http2_keep_alive_timeout",
1525        value!(Duration; Duration::from_secs(60)),
1526        "Time to wait for HTTP/2 pong response before terminating a gRPC client connection.",
1527        false,
1528    );
1529}
1530
1531/// Configuration for how cluster replicas are scheduled.
1532pub mod cluster_scheduling {
1533    use super::*;
1534    use mz_orchestrator::scheduling_config::*;
1535
1536    pub static CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1537        "cluster_multi_process_replica_az_affinity_weight",
1538        value!(Option<i32>; DEFAULT_POD_AZ_AFFINITY_WEIGHT),
1539        "Whether or not to add an availability zone affinity between instances of \
1540            multi-process replicas. Either an affinity weight or empty (off) (Materialize).",
1541        false,
1542    );
1543
1544    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY: VarDefinition = VarDefinition::new(
1545        "cluster_soften_replication_anti_affinity",
1546        value!(bool; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY),
1547        "Whether or not to turn the node-scope anti affinity between replicas \
1548            in the same cluster into a preference (Materialize).",
1549        false,
1550    );
1551
1552    pub static CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1553        "cluster_soften_replication_anti_affinity_weight",
1554        value!(i32; DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT),
1555        "The preference weight for `cluster_soften_replication_anti_affinity` (Materialize).",
1556        false,
1557    );
1558
1559    pub static CLUSTER_ENABLE_TOPOLOGY_SPREAD: VarDefinition = VarDefinition::new(
1560        "cluster_enable_topology_spread",
1561        value!(bool; DEFAULT_TOPOLOGY_SPREAD_ENABLED),
1562        "Whether or not to add topology spread constraints among replicas in the same cluster (Materialize).",
1563        false,
1564    );
1565
1566    pub static CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: VarDefinition =
1567        VarDefinition::new(
1568            "cluster_topology_spread_ignore_non_singular_scale",
1569            value!(bool; DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE),
1570            "If true, ignore replicas with more than 1 process when adding topology spread constraints (Materialize).",
1571            false,
1572        );
1573
1574    pub static CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW: VarDefinition = VarDefinition::new(
1575        "cluster_topology_spread_max_skew",
1576        value!(i32; DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW),
1577        "The `maxSkew` for replica topology spread constraints (Materialize).",
1578        false,
1579    );
1580
1581    // `minDomains`, like maxSkew, is used to spread across a topology
1582    // key. Unlike max skew, minDomains will force node creation to ensure
1583    // distribution across a minimum number of keys.
1584    // https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#spread-constraint-definition
1585    pub static CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS: VarDefinition = VarDefinition::new(
1586        "cluster_topology_spread_min_domains",
1587        value!(Option<i32>; None),
1588        "`minDomains` for replica topology spread constraints. \
1589            Should be set to the number of Availability Zones (Materialize).",
1590        false,
1591    );
1592
1593    pub static CLUSTER_TOPOLOGY_SPREAD_SOFT: VarDefinition = VarDefinition::new(
1594        "cluster_topology_spread_soft",
1595        value!(bool; DEFAULT_TOPOLOGY_SPREAD_SOFT),
1596        "If true, soften the topology spread constraints for replicas (Materialize).",
1597        false,
1598    );
1599
1600    pub static CLUSTER_SOFTEN_AZ_AFFINITY: VarDefinition = VarDefinition::new(
1601        "cluster_soften_az_affinity",
1602        value!(bool; DEFAULT_SOFTEN_AZ_AFFINITY),
1603        "Whether or not to turn the az-scope node affinity for replicas. \
1604            Note this could violate requests from the user (Materialize).",
1605        false,
1606    );
1607
1608    pub static CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT: VarDefinition = VarDefinition::new(
1609        "cluster_soften_az_affinity_weight",
1610        value!(i32; DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT),
1611        "The preference weight for `cluster_soften_az_affinity` (Materialize).",
1612        false,
1613    );
1614
1615    const DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL: Duration = Duration::from_secs(3);
1616
1617    pub static CLUSTER_ALTER_CHECK_READY_INTERVAL: VarDefinition = VarDefinition::new(
1618        "cluster_alter_check_ready_interval",
1619        value!(Duration; DEFAULT_CLUSTER_ALTER_CHECK_READY_INTERVAL),
1620        "How often to poll readiness checks for cluster alter",
1621        false,
1622    );
1623
1624    const DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL: Duration = Duration::from_secs(3);
1625
1626    pub static CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL: VarDefinition = VarDefinition::new(
1627        "cluster_check_scheduling_policies_interval",
1628        value!(Duration; DEFAULT_CHECK_SCHEDULING_POLICIES_INTERVAL),
1629        "How often policies are invoked to automatically start/stop clusters, e.g., \
1630            for REFRESH EVERY materialized views.",
1631        false,
1632    );
1633
1634    pub static CLUSTER_SECURITY_CONTEXT_ENABLED: VarDefinition = VarDefinition::new(
1635        "cluster_security_context_enabled",
1636        value!(bool; DEFAULT_SECURITY_CONTEXT_ENABLED),
1637        "Enables SecurityContext for clusterd instances, restricting capabilities to improve security.",
1638        false,
1639    );
1640
1641    const DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: Duration = Duration::from_secs(1200);
1642
1643    pub static CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: VarDefinition = VarDefinition::new(
1644        "cluster_refresh_mv_compaction_estimate",
1645        value!(Duration; DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE),
1646        "How much time to wait for compaction after a REFRESH MV completes a refresh \
1647            before turning off the refresh cluster. This is needed because Persist does compaction \
1648            only after a write, but refresh MVs do writes only at their refresh times. \
1649            (In the long term, we'd like to remove this configuration and instead wait exactly \
1650            until compaction has settled. We'd need some new Persist API for this.)",
1651        false,
1652    );
1653}
1654
1655/// Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the
1656/// availability of features.
1657///
1658/// The arguments to `feature_flags!` are:
1659/// - `$name`, which will be the name of the feature flag, in snake_case
1660/// - `$feature_desc`, a human-readable description of the feature
1661/// - `$value`, which if not provided, defaults to `false`
1662///
1663/// Note that not all `VarDefinition<bool>` are feature flags. Feature flags are for variables that:
1664/// - Belong to `SystemVars`, _not_ `SessionVars`
1665/// - Default to false and must be explicitly enabled, or default to `true` and can be explicitly disabled.
1666///
1667/// WARNING / CONTRACT: Syntax-related feature flags must always *enable* behavior. In other words,
1668/// setting a feature flag must make the system more permissive. For example, let's suppose we'd like
1669/// to gate deprecated upsert syntax behind a feature flag. In this case, do not add a feature flag
1670/// like `disable_deprecated_upsert_syntax`, as `disable_deprecated_upsert_syntax = on` would
1671/// _prevent_ the system from parsing the deprecated upsert syntax. Instead, use a feature flag
1672/// like `enable_deprecated_upsert_syntax`.
1673///
1674/// The hazard this protects against is related to reboots after feature flags have been disabled.
1675/// Say someone creates a Kinesis source while `enable_kinesis_sources = on`. Materialize will
1676/// commit this source to the system catalog. Then, suppose we discover a catastrophic bug in
1677/// Kinesis sources and set `enable_kinesis_sources` to `off`. This prevents users from creating
1678/// new Kinesis sources, but leaves the existing Kinesis sources in place. This is because
1679/// disabling a feature flag doesn't remove access to catalog objects created while the feature
1680/// flag was live. On the next reboot, Materialize will proceed to load the Kinesis source from the
1681/// catalog, reparsing and replanning the `CREATE SOURCE` definition and rechecking the
1682/// `enable_kinesis_sources` feature flag along the way. Even though the feature flag has been
1683/// switched to `off`, we need to temporarily re-enable it during parsing and planning to be able
1684/// to boot successfully.
1685///
1686/// Ensuring that all syntax-related feature flags *enable* behavior means that setting all such
1687/// feature flags to `on` during catalog boot has the desired effect.
1688macro_rules! feature_flags {
1689    // Match `$name, $feature_desc, $value`.
1690    (@inner
1691        // The feature flag name.
1692        name: $name:expr,
1693        // The feature flag description.
1694        desc: $desc:literal,
1695        // The feature flag default value.
1696        default: $value:expr,
1697    ) => {
1698        paste::paste!{
1699            // Note that the ServerVar is not directly exported; we expect these to be
1700            // accessible through their FeatureFlag variant.
1701            static [<$name:upper _VAR>]: VarDefinition = VarDefinition::new(
1702                stringify!($name),
1703                value!(bool; $value),
1704                concat!("Whether ", $desc, " is allowed (Materialize)."),
1705                false,
1706            );
1707
1708            pub static [<$name:upper >]: FeatureFlag = FeatureFlag {
1709                flag: &[<$name:upper _VAR>],
1710                feature_desc: $desc,
1711            };
1712        }
1713    };
1714    ($({
1715        // The feature flag name.
1716        name: $name:expr,
1717        // The feature flag description.
1718        desc: $desc:literal,
1719        // The feature flag default value.
1720        default: $value:expr,
1721        // Should the feature be turned on during catalog rehydration when
1722        // parsing a catalog item.
1723        enable_for_item_parsing: $enable_for_item_parsing:expr,
1724    },)+) => {
1725        $(feature_flags! { @inner
1726            name: $name,
1727            desc: $desc,
1728            default: $value,
1729        })+
1730
1731        paste::paste!{
1732            pub static FEATURE_FLAGS: &'static [&'static VarDefinition] = &[
1733                $(  & [<$name:upper _VAR>] , )+
1734            ];
1735        }
1736
1737        paste::paste!{
1738            impl super::SystemVars {
1739                pub fn enable_all_feature_flags_by_default(&mut self) {
1740                    $(
1741                        self.set_default(stringify!($name), super::VarInput::Flat("on"))
1742                            .expect("setting default value must work");
1743                    )+
1744                }
1745
1746                pub fn enable_for_item_parsing(&mut self) {
1747                    $(
1748                        if $enable_for_item_parsing {
1749                            self.set(stringify!($name), super::VarInput::Flat("on"))
1750                                .expect("setting default value must work");
1751                        }
1752                    )+
1753                }
1754
1755                $(
1756                    pub fn [<$name:lower>](&self) -> bool {
1757                        *self.expect_value(&[<$name:upper _VAR>])
1758                    }
1759                )+
1760            }
1761        }
1762    }
1763}
1764
1765feature_flags!(
1766    // Gates for other feature flags
1767    {
1768        name: allow_real_time_recency,
1769        desc: "real time recency",
1770        default: false,
1771        enable_for_item_parsing: true,
1772    },
1773    // Actual feature flags
1774    {
1775        name: enable_guard_subquery_tablefunc,
1776        desc: "Whether HIR -> MIR lowering should use a new tablefunc to guard subquery sizes",
1777        default: true,
1778        enable_for_item_parsing: false,
1779    },
1780    {
1781        name: enable_binary_date_bin,
1782        desc: "the binary version of date_bin function",
1783        default: false,
1784        enable_for_item_parsing: true,
1785    },
1786    {
1787        name: enable_date_bin_hopping,
1788        desc: "the date_bin_hopping function",
1789        default: false,
1790        enable_for_item_parsing: true,
1791    },
1792    {
1793        name: enable_envelope_debezium_in_subscribe,
1794        desc: "`ENVELOPE DEBEZIUM (KEY (..))`",
1795        default: false,
1796        enable_for_item_parsing: true,
1797    },
1798    {
1799        name: enable_envelope_materialize,
1800        desc: "ENVELOPE MATERIALIZE",
1801        default: false,
1802        enable_for_item_parsing: true,
1803    },
1804    {
1805        name: enable_explain_pushdown,
1806        desc: "EXPLAIN FILTER PUSHDOWN",
1807        default: true,
1808        enable_for_item_parsing: true,
1809    },
1810    {
1811        name: enable_index_options,
1812        desc: "INDEX OPTIONS",
1813        default: false,
1814        enable_for_item_parsing: true,
1815    },
1816    {
1817        name: enable_list_length_max,
1818        desc: "the list_length_max function",
1819        default: false,
1820        enable_for_item_parsing: true,
1821    },
1822    {
1823        name: enable_list_n_layers,
1824        desc: "the list_n_layers function",
1825        default: false,
1826        enable_for_item_parsing: true,
1827    },
1828    {
1829        name: enable_list_remove,
1830        desc: "the list_remove function",
1831        default: false,
1832        enable_for_item_parsing: true,
1833    },
1834    {
1835
1836        name: enable_logical_compaction_window,
1837        desc: "RETAIN HISTORY",
1838        default: false,
1839        enable_for_item_parsing: true,
1840    },
1841    {
1842        name: enable_primary_key_not_enforced,
1843        desc: "PRIMARY KEY NOT ENFORCED",
1844        default: false,
1845        enable_for_item_parsing: true,
1846    },
1847    {
1848        name: enable_collection_partition_by,
1849        desc: "PARTITION BY",
1850        default: true,
1851        enable_for_item_parsing: true,
1852    },
1853    {
1854        name: enable_multi_worker_storage_persist_sink,
1855        desc: "multi-worker storage persist sink",
1856        default: true,
1857        enable_for_item_parsing: true,
1858    },
1859    {
1860        name: enable_persist_streaming_snapshot_and_fetch,
1861        desc: "use the new streaming consolidate for snapshot_and_fetch",
1862        default: false,
1863        enable_for_item_parsing: true,
1864    },
1865    {
1866        name: enable_persist_streaming_compaction,
1867        desc: "use the new streaming consolidate for compaction",
1868        default: false,
1869        enable_for_item_parsing: true,
1870    },
1871    {
1872        name: enable_raise_statement,
1873        desc: "RAISE statement",
1874        default: false,
1875        enable_for_item_parsing: true,
1876    },
1877    {
1878        name: enable_repeat_row,
1879        desc: "the repeat_row function",
1880        default: false,
1881        enable_for_item_parsing: true,
1882    },
1883    {
1884        name: enable_replica_targeted_materialized_views,
1885        desc: "replica-targeted materialized views",
1886        default: false,
1887        enable_for_item_parsing: true,
1888    },
1889    {
1890        name: unsafe_enable_table_check_constraint,
1891        desc: "CREATE TABLE with a check constraint",
1892        default: false,
1893        enable_for_item_parsing: true,
1894    },
1895    {
1896        name: unsafe_enable_table_foreign_key,
1897        desc: "CREATE TABLE with a foreign key",
1898        default: false,
1899        enable_for_item_parsing: true,
1900    },
1901    {
1902        name: unsafe_enable_table_keys,
1903        desc: "CREATE TABLE with a primary key or unique constraint",
1904        default: false,
1905        enable_for_item_parsing: true,
1906    },
1907    {
1908        name: unsafe_enable_unorchestrated_cluster_replicas,
1909        desc: "unorchestrated cluster replicas",
1910        default: false,
1911        enable_for_item_parsing: true,
1912    },
1913    {
1914        name: unsafe_enable_unstable_dependencies,
1915        desc: "depending on unstable objects",
1916        default: false,
1917        enable_for_item_parsing: true,
1918    },
1919    {
1920        name: enable_within_timestamp_order_by_in_subscribe,
1921        desc: "`WITHIN TIMESTAMP ORDER BY ..`",
1922        default: false,
1923        enable_for_item_parsing: true,
1924    },
1925    {
1926        name: enable_cardinality_estimates,
1927        desc: "join planning with cardinality estimates",
1928        default: false,
1929        enable_for_item_parsing: false,
1930    },
1931    {
1932        name: enable_connection_validation_syntax,
1933        desc: "CREATE CONNECTION .. WITH (VALIDATE) and VALIDATE CONNECTION syntax",
1934        default: true,
1935        enable_for_item_parsing: true,
1936    },
1937    {
1938        name: enable_alter_set_cluster,
1939        desc: "ALTER ... SET CLUSTER syntax",
1940        default: false,
1941        enable_for_item_parsing: true,
1942    },
1943    {
1944        name: unsafe_enable_unsafe_functions,
1945        desc: "executing potentially dangerous functions",
1946        default: false,
1947        enable_for_item_parsing: true,
1948    },
1949    {
1950        name: enable_managed_cluster_availability_zones,
1951        desc: "MANAGED, AVAILABILITY ZONES syntax",
1952        default: false,
1953        enable_for_item_parsing: true,
1954    },
1955    {
1956        name: statement_logging_use_reproducible_rng,
1957        desc: "statement logging with reproducible RNG",
1958        default: false,
1959        enable_for_item_parsing: false,
1960    },
1961    {
1962        name: enable_notices_for_index_already_exists,
1963        desc: "emitting notices for IndexAlreadyExists (doesn't affect EXPLAIN)",
1964        default: true,
1965        enable_for_item_parsing: true,
1966    },
1967    {
1968        name: enable_notices_for_index_too_wide_for_literal_constraints,
1969        desc: "emitting notices for IndexTooWideForLiteralConstraints (doesn't affect EXPLAIN)",
1970        default: false,
1971        enable_for_item_parsing: true,
1972    },
1973    {
1974        name: enable_notices_for_index_empty_key,
1975        desc: "emitting notices for indexes with an empty key (doesn't affect EXPLAIN)",
1976        default: true,
1977        enable_for_item_parsing: true,
1978    },
1979    {
1980        name: enable_notices_for_equals_null,
1981        desc: "emitting notices for `= NULL` and `<> NULL` comparisons (doesn't affect EXPLAIN)",
1982        default: true,
1983        enable_for_item_parsing: true,
1984    },
1985    {
1986        name: enable_alter_swap,
1987        desc: "the ALTER SWAP feature for objects",
1988        default: true,
1989        enable_for_item_parsing: true,
1990    },
1991    {
1992        name: enable_new_outer_join_lowering,
1993        desc: "new outer join lowering",
1994        default: true,
1995        enable_for_item_parsing: false,
1996    },
1997    {
1998        name: enable_time_at_time_zone,
1999        desc: "use of AT TIME ZONE or timezone() with time type",
2000        default: false,
2001        enable_for_item_parsing: true,
2002    },
2003    {
2004        name: enable_load_generator_counter,
2005        desc: "Create a LOAD GENERATOR COUNTER",
2006        default: false,
2007        enable_for_item_parsing: true,
2008    },
2009    {
2010        name: enable_load_generator_clock,
2011        desc: "Create a LOAD GENERATOR CLOCK",
2012        default: false,
2013        enable_for_item_parsing: true,
2014    },
2015    {
2016        name: enable_load_generator_datums,
2017        desc: "Create a LOAD GENERATOR DATUMS",
2018        default: false,
2019        enable_for_item_parsing: true,
2020    },
2021    {
2022        name: enable_load_generator_key_value,
2023        desc: "Create a LOAD GENERATOR KEY VALUE",
2024        default: false,
2025        enable_for_item_parsing: true,
2026    },
2027    {
2028        name: enable_expressions_in_limit_syntax,
2029        desc: "LIMIT <expr> syntax",
2030        default: true,
2031        enable_for_item_parsing: true,
2032    },
2033    {
2034        name: enable_mz_notices,
2035        desc: "Populate the contents of `mz_internal.mz_notices`",
2036        default: true,
2037        enable_for_item_parsing: false,
2038    },
2039    {
2040        name: enable_eager_delta_joins,
2041        desc:
2042            "eager delta joins",
2043        default: false,
2044        enable_for_item_parsing: false,
2045    },
2046    {
2047        name: enable_off_thread_optimization,
2048        desc: "use off-thread optimization in `CREATE` statements",
2049        default: true,
2050        enable_for_item_parsing: false,
2051    },
2052    {
2053        name: enable_refresh_every_mvs,
2054        desc: "REFRESH EVERY and REFRESH AT materialized views",
2055        default: false,
2056        enable_for_item_parsing: true,
2057    },
2058    {
2059        name: enable_cluster_schedule_refresh,
2060        desc: "`SCHEDULE = ON REFRESH` cluster option",
2061        default: false,
2062        enable_for_item_parsing: true,
2063    },
2064    {
2065        name: enable_reduce_mfp_fusion,
2066        desc: "fusion of MFPs in reductions",
2067        default: true,
2068        enable_for_item_parsing: false,
2069    },
2070    {
2071        name: enable_worker_core_affinity,
2072        desc: "set core affinity for replica worker threads",
2073        default: false,
2074        enable_for_item_parsing: false,
2075    },
2076    {
2077        name: enable_copy_to_expr,
2078        desc: "COPY ... TO 's3://...'",
2079        default: true,
2080        enable_for_item_parsing: false,
2081    },
2082    {
2083        name: enable_session_timelines,
2084        desc: "strong session serializable isolation levels",
2085        default: false,
2086        enable_for_item_parsing: false,
2087    },
2088    {
2089        name: enable_variadic_left_join_lowering,
2090        desc: "Enable joint HIR ⇒ MIR lowering of stacks of left joins",
2091        default: true,
2092        enable_for_item_parsing: false,
2093    },
2094    {
2095        name: enable_redacted_test_option,
2096        desc: "Enable useless option to test value redaction",
2097        default: false,
2098        enable_for_item_parsing: true,
2099    },
2100    {
2101        name: enable_letrec_fixpoint_analysis,
2102        desc: "Enable Lattice-based fixpoint iteration on LetRec nodes in the Analysis framework",
2103        default: true, // This is just a failsafe switch for the deployment of materialize#25591.
2104        enable_for_item_parsing: false,
2105    },
2106    {
2107        name: enable_kafka_sink_headers,
2108        desc: "Enable the HEADERS option for Kafka sinks",
2109        default: false,
2110        enable_for_item_parsing: true,
2111    },
2112    {
2113        name: enable_unlimited_retain_history,
2114        desc: "Disable limits on RETAIN HISTORY (below 1s default, and 0 disables compaction).",
2115        default: false,
2116        enable_for_item_parsing: true,
2117    },
2118    {
2119        name: enable_envelope_upsert_inline_errors,
2120        desc: "The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT",
2121        default: true,
2122        enable_for_item_parsing: true,
2123    },
2124    {
2125        name: enable_alter_table_add_column,
2126        desc: "Enable ALTER TABLE ... ADD COLUMN ...",
2127        default: false,
2128        enable_for_item_parsing: false,
2129    },
2130    {
2131        name: enable_zero_downtime_cluster_reconfiguration,
2132        desc: "Enable zero-downtime reconfiguration for alter cluster",
2133        default: false,
2134        enable_for_item_parsing: false,
2135    },
2136    {
2137        name: enable_aws_msk_iam_auth,
2138        desc: "Enable AWS MSK IAM authentication for Kafka connections",
2139        default: true,
2140        enable_for_item_parsing: true,
2141    },
2142    {
2143        name: enable_continual_task_create,
2144        desc: "CREATE CONTINUAL TASK",
2145        default: false,
2146        enable_for_item_parsing: true,
2147    },
2148    {
2149        name: enable_continual_task_transform,
2150        desc: "CREATE CONTINUAL TASK .. FROM TRANSFORM .. USING",
2151        default: false,
2152        enable_for_item_parsing: true,
2153    },
2154    {
2155        name: enable_continual_task_retain,
2156        desc: "CREATE CONTINUAL TASK .. FROM RETAIN .. WHILE",
2157        default: false,
2158        enable_for_item_parsing: true,
2159    },
2160    {
2161        name: enable_network_policies,
2162        desc: "ENABLE NETWORK POLICIES",
2163        default: true,
2164        enable_for_item_parsing: true,
2165    },
2166    {
2167        name: enable_create_table_from_source,
2168        desc: "Whether to allow CREATE TABLE .. FROM SOURCE syntax.",
2169        default: false,
2170        enable_for_item_parsing: true,
2171    },
2172    {
2173        name: enable_copy_from_remote,
2174        desc: "Whether to allow COPY FROM <url>.",
2175        default: true,
2176        enable_for_item_parsing: false,
2177    },
2178    {
2179        name: enable_join_prioritize_arranged,
2180        desc: "Whether join planning should prioritize already-arranged keys over keys with more fields.",
2181        default: false,
2182        enable_for_item_parsing: false,
2183    },
2184    {
2185        name: enable_sql_server_source,
2186        desc: "Creating a SQL SERVER source",
2187        default: true,
2188        enable_for_item_parsing: false,
2189    },
2190    {
2191        name: enable_projection_pushdown_after_relation_cse,
2192        desc: "Run ProjectionPushdown one more time after the last RelationCSE.",
2193        default: true,
2194        enable_for_item_parsing: false,
2195    },
2196    {
2197        name: enable_less_reduce_in_eqprop,
2198        desc: "Run MSE::reduce in EquivalencePropagation only if reduce_expr changed something.",
2199        default: true,
2200        enable_for_item_parsing: false,
2201    },
2202    {
2203        name: enable_dequadratic_eqprop_map,
2204        desc: "Skip the quadratic part of EquivalencePropagation's handling of Map.",
2205        default: true,
2206        enable_for_item_parsing: false,
2207    },
2208    {
2209        name: enable_eq_classes_withholding_errors,
2210        desc: "Use `EquivalenceClassesWithholdingErrors` instead of raw `EquivalenceClasses` during eq prop for joins.",
2211        default: true,
2212        enable_for_item_parsing: false,
2213    },
2214    {
2215        name: enable_fast_path_plan_insights,
2216        desc: "Enables those plan insight notices that help with getting fast path queries. Don't turn on before #9492 is fixed!",
2217        default: false,
2218        enable_for_item_parsing: false,
2219    },
2220    {
2221        name: enable_with_ordinality_legacy_fallback,
2222        desc: "When the new WITH ORDINALITY implementation can't be used with a table func, whether to fall back to the legacy implementation or error out.",
2223        default: false,
2224        enable_for_item_parsing: true,
2225    },
2226    {
2227        name: enable_iceberg_sink,
2228        desc: "Whether to enable the Iceberg sink.",
2229        default: true,
2230        enable_for_item_parsing: true,
2231    },
2232    {
2233        name: enable_frontend_peek_sequencing, // currently, changes only take effect for new sessions
2234        desc: "Enables the new peek sequencing code, which does most of its work in the Adapter Frontend instead of the Coordinator main task.",
2235        default: true,
2236        enable_for_item_parsing: false,
2237    },
2238    {
2239        name: enable_replacement_materialized_views,
2240        desc: "Whether to enable replacement materialized views.",
2241        default: true,
2242        enable_for_item_parsing: true,
2243    },
2244    {
2245        name: enable_cast_elimination,
2246        desc: "Allow the optimizer to eliminate noop casts between values of equivalent representation types.",
2247        default: true,
2248        enable_for_item_parsing: false,
2249    },
2250    {
2251        // Just an escape hatch for the unlikely case that we have some user who is doing such
2252        // queries. Can be removed after one week in prod.
2253        // https://github.com/MaterializeInc/database-issues/issues/10004
2254        name: disallow_unmaterializable_functions_as_of,
2255        desc: "Prohibits calling unmaterializable functions (except `mz_now`) in AS OF queries.",
2256        default: true,
2257        enable_for_item_parsing: false,
2258    },
2259);
2260
2261impl From<&super::SystemVars> for OptimizerFeatures {
2262    fn from(vars: &super::SystemVars) -> Self {
2263        Self {
2264            enable_guard_subquery_tablefunc: vars.enable_guard_subquery_tablefunc(),
2265            enable_consolidate_after_union_negate: vars.enable_consolidate_after_union_negate(),
2266            enable_eager_delta_joins: vars.enable_eager_delta_joins(),
2267            enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(),
2268            enable_reduce_mfp_fusion: vars.enable_reduce_mfp_fusion(),
2269            enable_variadic_left_join_lowering: vars.enable_variadic_left_join_lowering(),
2270            enable_letrec_fixpoint_analysis: vars.enable_letrec_fixpoint_analysis(),
2271            enable_cardinality_estimates: vars.enable_cardinality_estimates(),
2272            persist_fast_path_limit: vars.persist_fast_path_limit(),
2273            reoptimize_imported_views: false,
2274            enable_join_prioritize_arranged: vars.enable_join_prioritize_arranged(),
2275            enable_projection_pushdown_after_relation_cse: vars
2276                .enable_projection_pushdown_after_relation_cse(),
2277            enable_less_reduce_in_eqprop: vars.enable_less_reduce_in_eqprop(),
2278            enable_dequadratic_eqprop_map: vars.enable_dequadratic_eqprop_map(),
2279            enable_eq_classes_withholding_errors: vars.enable_eq_classes_withholding_errors(),
2280            enable_fast_path_plan_insights: vars.enable_fast_path_plan_insights(),
2281            enable_cast_elimination: vars.enable_cast_elimination(),
2282        }
2283    }
2284}
2285
2286#[cfg(test)]
2287mod tests {
2288    use super::*;
2289    use crate::session::vars::SystemVars;
2290
2291    /// Ensure that all vars used for optimizer features have `enable_for_item_parsing = false`.
2292    ///
2293    /// This is important to ensure that plan caching works as intended during item parsing. Cached
2294    /// plans include the optimizer features they were produced with, and if they don't match on
2295    /// lookup, that results in a cache miss.
2296    #[mz_ore::test]
2297    fn optimizer_features_no_enable_for_item_parsing() {
2298        // Construct a `SystemVars` where all optimizer features are `false`.
2299        //
2300        // We do this in a roundabout way, by first constructing all-false `OptimizerFeatures` and
2301        // then assigning them to their respective system vars, to ensure we don't forget to update
2302        // this test when new optimizer features are added.
2303        let false_features = OptimizerFeatures::default();
2304        let OptimizerFeatures {
2305            enable_eq_classes_withholding_errors,
2306            enable_guard_subquery_tablefunc,
2307            enable_consolidate_after_union_negate,
2308            enable_eager_delta_joins,
2309            enable_letrec_fixpoint_analysis,
2310            enable_new_outer_join_lowering,
2311            enable_reduce_mfp_fusion,
2312            enable_variadic_left_join_lowering,
2313            enable_cardinality_estimates,
2314            persist_fast_path_limit,
2315            reoptimize_imported_views,
2316            enable_join_prioritize_arranged,
2317            enable_projection_pushdown_after_relation_cse,
2318            enable_less_reduce_in_eqprop,
2319            enable_dequadratic_eqprop_map,
2320            enable_fast_path_plan_insights,
2321            enable_cast_elimination,
2322        } = false_features;
2323
2324        let mut vars = SystemVars::new();
2325
2326        macro_rules! set_var {
2327            ($var:ident) => {
2328                vars.set(stringify!($var), VarInput::Flat(&$var.to_string()))
2329                    .unwrap();
2330            };
2331        }
2332
2333        set_var!(enable_eq_classes_withholding_errors);
2334        set_var!(enable_guard_subquery_tablefunc);
2335        set_var!(enable_consolidate_after_union_negate);
2336        set_var!(enable_eager_delta_joins);
2337        set_var!(enable_letrec_fixpoint_analysis);
2338        set_var!(enable_new_outer_join_lowering);
2339        set_var!(enable_reduce_mfp_fusion);
2340        set_var!(enable_variadic_left_join_lowering);
2341        set_var!(enable_cardinality_estimates);
2342        set_var!(persist_fast_path_limit);
2343        let _ = reoptimize_imported_views; // no corresponding var
2344        set_var!(enable_join_prioritize_arranged);
2345        set_var!(enable_projection_pushdown_after_relation_cse);
2346        set_var!(enable_less_reduce_in_eqprop);
2347        set_var!(enable_dequadratic_eqprop_map);
2348        set_var!(enable_fast_path_plan_insights);
2349        set_var!(enable_cast_elimination);
2350
2351        // Enable for item parsing, then ensure we still get the same optimizer features.
2352        vars.enable_for_item_parsing();
2353        let features_for_item_parsing = OptimizerFeatures::from(&vars);
2354        assert_eq!(features_for_item_parsing, false_features);
2355    }
2356}