mz_storage_types/
dyncfgs.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Dyncfgs used by the storage layer. Despite their name, these can be used
11//! "statically" during rendering, or dynamically within timely operators.
12
13use mz_dyncfg::{Config, ConfigSet};
14use std::time::Duration;
15
16/// When dataflows observe an invariant violation it is either due to a bug or due to the cluster
17/// being shut down. This configuration defines the amount of time to wait before panicking the
18/// process, which will register the invariant violation.
19pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(
20    "storage_cluster_shutdown_grace_period",
21    Duration::from_secs(10 * 60),
22    "When dataflows observe an invariant violation it is either due to a bug or due to \
23        the cluster being shut down. This configuration defines the amount of time to \
24        wait before panicking the process, which will register the invariant violation.",
25);
26
27// Flow control
28
29/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
30/// Configuration for basic hydration backpressure.
31pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
32    "storage_dataflow_delay_sources_past_rehydration",
33    // This was original `false`, but it is not enabled everywhere.
34    true,
35    "Whether or not to delay sources producing values in some scenarios \
36        (namely, upsert) till after rehydration is finished",
37);
38
39/// Whether storage dataflows should suspend execution while downstream operators are still
40/// processing data.
41pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
42    "storage_dataflow_suspendable_sources",
43    true,
44    "Whether storage dataflows should suspend execution while downstream operators are still \
45        processing data.",
46);
47
48// Controller
49
50/// When enabled, force-downgrade the controller's since handle on the shard
51/// during shard finalization.
52pub const STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION: Config<bool> = Config::new(
53    "storage_downgrade_since_during_finalization",
54    // This was original `false`, but it is not enabled everywhere.
55    true,
56    "When enabled, force-downgrade the controller's since handle on the shard\
57    during shard finalization",
58);
59
60/// The interval of time to keep when truncating the replica metrics history.
61pub const REPLICA_METRICS_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
62    "replica_metrics_history_retention_interval",
63    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
64    "The interval of time to keep when truncating the replica metrics history.",
65);
66
67/// The interval of time to keep when truncating the wallclock lag history.
68pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
69    "wallclock_lag_history_retention_interval",
70    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
71    "The interval of time to keep when truncating the wallclock lag history.",
72);
73
74/// The interval of time to keep when truncating the wallclock lag histogram.
75pub const WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL: Config<Duration> = Config::new(
76    "wallclock_global_lag_histogram_retention_interval",
77    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
78    "The interval of time to keep when truncating the wallclock lag histogram.",
79);
80
81// Kafka
82
83/// Rules for enriching the `client.id` property of Kafka clients with
84/// additional data.
85///
86/// The configuration value must be a JSON array of objects containing keys
87/// named `pattern` and `payload`, both of type string. Rules are checked in the
88/// order they are defined. The rule's pattern must be a regular expression
89/// understood by the Rust `regex` crate. If the rule's pattern matches the
90/// address of any broker in the connection, then the payload is appended to the
91/// client ID. A rule's payload is always prefixed with `-`, to separate it from
92/// the preceding data in the client ID.
93pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config<fn() -> serde_json::Value> = Config::new(
94    "kafka_client_id_enrichment_rules",
95    || serde_json::json!([]),
96    "Rules for enriching the `client.id` property of Kafka clients with additional data.",
97);
98
99/// The maximum time we will wait before re-polling rdkafka to see if new partitions/data are
100/// available.
101pub const KAFKA_POLL_MAX_WAIT: Config<Duration> = Config::new(
102    "kafka_poll_max_wait",
103    Duration::from_secs(1),
104    "The maximum time we will wait before re-polling rdkafka to see if new partitions/data are \
105    available.",
106);
107
108/// Interval to fetch topic partition metadata.
109pub static KAFKA_METADATA_FETCH_INTERVAL: Config<Duration> = Config::new(
110    "kafka_default_metadata_fetch_interval",
111    Duration::from_secs(60),
112    "Interval to fetch topic partition metadata.",
113);
114
115pub const KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM: Config<&'static str> =
116    Config::new(
117        "kafka_default_aws_privatelink_endpoint_identification_algorithm",
118        // Default to no hostname verification, which is the default in versions of `librdkafka <1.9.2`.
119        "none",
120        "The value we set for the 'ssl.endpoint.identification.algorithm' option in the Kafka \
121    Connection config. default: 'none'",
122    );
123
124pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config::new(
125    "kafka_buffered_event_resize_threshold_elements",
126    1000,
127    "In the Kafka sink operator we might need to buffer messages before emitting them. As a \
128        performance optimization we reuse the buffer allocations, but shrink it to retain at \
129        most this number of elements.",
130);
131
132// MySQL
133
134/// Replication heartbeat interval requested from the MySQL server.
135pub const MYSQL_REPLICATION_HEARTBEAT_INTERVAL: Config<Duration> = Config::new(
136    "mysql_replication_heartbeat_interval",
137    Duration::from_secs(30),
138    "Replication heartbeat interval requested from the MySQL server.",
139);
140
141/// Interval to fetch `offset_known`, from `@gtid_executed`
142pub const MYSQL_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
143    "mysql_offset_known_interval",
144    Duration::from_secs(10),
145    "Interval to fetch `offset_known`, from `@gtid_executed`",
146);
147
148// Postgres
149
150/// Interval to poll `confirmed_flush_lsn` to get a resumption lsn.
151pub const PG_FETCH_SLOT_RESUME_LSN_INTERVAL: Config<Duration> = Config::new(
152    "postgres_fetch_slot_resume_lsn_interval",
153    Duration::from_millis(500),
154    "Interval to poll `confirmed_flush_lsn` to get a resumption lsn.",
155);
156
157/// Interval to fetch `offset_known`, from `pg_current_wal_lsn`
158pub const PG_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
159    "pg_offset_known_interval",
160    Duration::from_secs(10),
161    "Interval to fetch `offset_known`, from `pg_current_wal_lsn`",
162);
163
164/// Interval to re-validate the schemas of ingested tables.
165pub const PG_SCHEMA_VALIDATION_INTERVAL: Config<Duration> = Config::new(
166    "pg_schema_validation_interval",
167    Duration::from_secs(15),
168    "Interval to re-validate the schemas of ingested tables.",
169);
170
171// Networking
172
173/// Whether or not to enforce that external connection addresses are global
174/// (not private or local) when resolving them.
175pub const ENFORCE_EXTERNAL_ADDRESSES: Config<bool> = Config::new(
176    "storage_enforce_external_addresses",
177    false,
178    "Whether or not to enforce that external connection addresses are global \
179          (not private or local) when resolving them",
180);
181
182// Upsert
183
184/// Whether or not to prevent buffering the entire _upstream_ snapshot in
185/// memory when processing it in memory. This is generally understood to reduce
186/// memory consumption.
187///
188/// When false, in general the memory utilization while processing the snapshot is:
189/// # of snapshot updates + (# of unique keys in snapshot * N), where N is some small
190/// integer number of buffers
191///
192/// When true, in general the memory utilization while processing the snapshot is:
193/// # of snapshot updates + (RocksDB buffers + # of keys in batch produced by upstream) * # of
194/// workers.
195///
196/// Without hydration flow control, which is not yet implemented, there are workloads that may
197/// cause the latter to use more memory, which is why we offer this configuration.
198pub const STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING: Config<bool> = Config::new(
199    "storage_upsert_prevent_snapshot_buffering",
200    true,
201    "Prevent snapshot buffering in upsert.",
202);
203
204/// Whether to enable the merge operator in upsert for the RocksDB backend.
205pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
206    "storage_rocksdb_use_merge_operator",
207    true,
208    "Use the native rocksdb merge operator where possible.",
209);
210
211/// If `storage_upsert_prevent_snapshot_buffering` is true, this prevents the upsert
212/// operator from buffering too many events from the upstream snapshot. In the absence
213/// of hydration flow control, this could prevent certain workloads from causing egregiously
214/// large writes to RocksDB.
215pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config<Option<usize>> = Config::new(
216    "storage_upsert_max_snapshot_batch_buffering",
217    None,
218    "Limit snapshot buffering in upsert.",
219);
220
221// RocksDB
222
223/// How many times to try to cleanup old RocksDB DB's on disk before giving up.
224pub const STORAGE_ROCKSDB_CLEANUP_TRIES: Config<usize> = Config::new(
225    "storage_rocksdb_cleanup_tries",
226    5,
227    "How many times to try to cleanup old RocksDB DB's on disk before giving up.",
228);
229
230/// Delay interval when reconnecting to a source / sink after halt.
231pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
232    "storage_suspend_and_restart_delay",
233    Duration::from_secs(5),
234    "Delay interval when reconnecting to a source / sink after halt.",
235);
236
237/// If true, skip fetching the snapshot in the sink once the frontier has advanced.
238pub const STORAGE_SINK_SNAPSHOT_FRONTIER: Config<bool> = Config::new(
239    "storage_sink_snapshot_frontier",
240    true,
241    "If true, skip fetching the snapshot in the sink once the frontier has advanced.",
242);
243
244/// Whether to mint reclock bindings based on the latest probed frontier or the currently ingested
245/// frontier.
246pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
247    "storage_reclock_to_latest",
248    false,
249    "Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset.",
250);
251
252/// Whether to use the new continual feedback upsert operator.
253pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
254    "storage_use_continual_feedback_upsert",
255    true,
256    "Whether to use the new continual feedback upsert operator.",
257);
258
259/// The interval at which the storage server performs maintenance tasks.
260pub const STORAGE_SERVER_MAINTENANCE_INTERVAL: Config<Duration> = Config::new(
261    "storage_server_maintenance_interval",
262    Duration::from_millis(10),
263    "The interval at which the storage server performs maintenance tasks. Zero enables maintenance on every iteration.",
264);
265
266/// If set, iteratively search the progress topic for a progress record with increasing lookback.
267pub const SINK_PROGRESS_SEARCH: Config<bool> = Config::new(
268    "storage_sink_progress_search",
269    true,
270    "If set, iteratively search the progress topic for a progress record with increasing lookback.",
271);
272
273/// Configure how to behave when trying to create an existing topic with specified configs.
274pub const SINK_ENSURE_TOPIC_CONFIG: Config<&'static str> = Config::new(
275    "storage_sink_ensure_topic_config",
276    "skip",
277    "If `skip`, don't check the config of existing topics; if `check`, fetch the config and \
278    warn if it does not match the expected configs; if `alter`, attempt to change the upstream to \
279    match the expected configs.",
280);
281
282/// Configure mz-ore overflowing type behavior.
283pub const ORE_OVERFLOWING_BEHAVIOR: Config<&'static str> = Config::new(
284    "ore_overflowing_behavior",
285    "ignore",
286    "Overflow behavior for Overflowing types. One of 'ignore', 'panic', 'soft_panic'.",
287);
288
289/// Adds the full set of all storage `Config`s.
290pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
291    configs
292        .add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
293        .add(&DELAY_SOURCES_PAST_REHYDRATION)
294        .add(&ENFORCE_EXTERNAL_ADDRESSES)
295        .add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
296        .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
297        .add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
298        .add(&KAFKA_METADATA_FETCH_INTERVAL)
299        .add(&KAFKA_POLL_MAX_WAIT)
300        .add(&MYSQL_OFFSET_KNOWN_INTERVAL)
301        .add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
302        .add(&ORE_OVERFLOWING_BEHAVIOR)
303        .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
304        .add(&PG_OFFSET_KNOWN_INTERVAL)
305        .add(&PG_SCHEMA_VALIDATION_INTERVAL)
306        .add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL)
307        .add(&SINK_ENSURE_TOPIC_CONFIG)
308        .add(&SINK_PROGRESS_SEARCH)
309        .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
310        .add(&STORAGE_RECLOCK_TO_LATEST)
311        .add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
312        .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
313        .add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)
314        .add(&STORAGE_SINK_SNAPSHOT_FRONTIER)
315        .add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
316        .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
317        .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
318        .add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
319        .add(&SUSPENDABLE_SOURCES)
320        .add(&WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL)
321        .add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL)
322        .add(&crate::sources::sql_server::CDC_POLL_INTERVAL)
323        .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE)
324        .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES)
325        .add(&crate::sources::sql_server::SNAPSHOT_MAX_LSN_WAIT)
326        .add(&crate::sources::sql_server::SNAPSHOT_PROGRESS_REPORT_INTERVAL)
327        .add(&crate::sources::sql_server::OFFSET_KNOWN_INTERVAL)
328}