1use mz_dyncfg::{Config, ConfigSet};
14use std::time::Duration;
15
16pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(
20 "storage_cluster_shutdown_grace_period",
21 Duration::from_secs(10 * 60),
22 "When dataflows observe an invariant violation it is either due to a bug or due to \
23 the cluster being shut down. This configuration defines the amount of time to \
24 wait before panicking the process, which will register the invariant violation.",
25);
26
27pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
32 "storage_dataflow_delay_sources_past_rehydration",
33 true,
35 "Whether or not to delay sources producing values in some scenarios \
36 (namely, upsert) till after rehydration is finished",
37);
38
39pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
42 "storage_dataflow_suspendable_sources",
43 true,
44 "Whether storage dataflows should suspend execution while downstream operators are still \
45 processing data.",
46);
47
48pub const STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION: Config<bool> = Config::new(
53 "storage_downgrade_since_during_finalization",
54 true,
56 "When enabled, force-downgrade the controller's since handle on the shard\
57 during shard finalization",
58);
59
60pub const REPLICA_METRICS_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
62 "replica_metrics_history_retention_interval",
63 Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the replica metrics history.",
65);
66
67pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
69 "wallclock_lag_history_retention_interval",
70 Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the wallclock lag history.",
72);
73
74pub const WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL: Config<Duration> = Config::new(
76 "wallclock_global_lag_histogram_retention_interval",
77 Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the wallclock lag histogram.",
79);
80
81pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config<fn() -> serde_json::Value> = Config::new(
94 "kafka_client_id_enrichment_rules",
95 || serde_json::json!([]),
96 "Rules for enriching the `client.id` property of Kafka clients with additional data.",
97);
98
99pub const KAFKA_POLL_MAX_WAIT: Config<Duration> = Config::new(
102 "kafka_poll_max_wait",
103 Duration::from_secs(1),
104 "The maximum time we will wait before re-polling rdkafka to see if new partitions/data are \
105 available.",
106);
107
108pub static KAFKA_METADATA_FETCH_INTERVAL: Config<Duration> = Config::new(
110 "kafka_default_metadata_fetch_interval",
111 Duration::from_secs(60),
112 "Interval to fetch topic partition metadata.",
113);
114
115pub const KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM: Config<&'static str> =
116 Config::new(
117 "kafka_default_aws_privatelink_endpoint_identification_algorithm",
118 "none",
120 "The value we set for the 'ssl.endpoint.identification.algorithm' option in the Kafka \
121 Connection config. default: 'none'",
122 );
123
124pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config::new(
125 "kafka_buffered_event_resize_threshold_elements",
126 1000,
127 "In the Kafka sink operator we might need to buffer messages before emitting them. As a \
128 performance optimization we reuse the buffer allocations, but shrink it to retain at \
129 most this number of elements.",
130);
131
132pub const MYSQL_REPLICATION_HEARTBEAT_INTERVAL: Config<Duration> = Config::new(
136 "mysql_replication_heartbeat_interval",
137 Duration::from_secs(30),
138 "Replication heartbeat interval requested from the MySQL server.",
139);
140
141pub const MYSQL_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
143 "mysql_offset_known_interval",
144 Duration::from_secs(10),
145 "Interval to fetch `offset_known`, from `@gtid_executed`",
146);
147
148pub const PG_FETCH_SLOT_RESUME_LSN_INTERVAL: Config<Duration> = Config::new(
152 "postgres_fetch_slot_resume_lsn_interval",
153 Duration::from_millis(500),
154 "Interval to poll `confirmed_flush_lsn` to get a resumption lsn.",
155);
156
157pub const PG_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
159 "pg_offset_known_interval",
160 Duration::from_secs(10),
161 "Interval to fetch `offset_known`, from `pg_current_wal_lsn`",
162);
163
164pub const PG_SCHEMA_VALIDATION_INTERVAL: Config<Duration> = Config::new(
166 "pg_schema_validation_interval",
167 Duration::from_secs(15),
168 "Interval to re-validate the schemas of ingested tables.",
169);
170
171pub const ENFORCE_EXTERNAL_ADDRESSES: Config<bool> = Config::new(
176 "storage_enforce_external_addresses",
177 false,
178 "Whether or not to enforce that external connection addresses are global \
179 (not private or local) when resolving them",
180);
181
182pub const STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING: Config<bool> = Config::new(
199 "storage_upsert_prevent_snapshot_buffering",
200 true,
201 "Prevent snapshot buffering in upsert.",
202);
203
204pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
206 "storage_rocksdb_use_merge_operator",
207 true,
208 "Use the native rocksdb merge operator where possible.",
209);
210
211pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config<Option<usize>> = Config::new(
216 "storage_upsert_max_snapshot_batch_buffering",
217 None,
218 "Limit snapshot buffering in upsert.",
219);
220
221pub const STORAGE_ROCKSDB_CLEANUP_TRIES: Config<usize> = Config::new(
225 "storage_rocksdb_cleanup_tries",
226 5,
227 "How many times to try to cleanup old RocksDB DB's on disk before giving up.",
228);
229
230pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
232 "storage_suspend_and_restart_delay",
233 Duration::from_secs(5),
234 "Delay interval when reconnecting to a source / sink after halt.",
235);
236
237pub const STORAGE_SINK_SNAPSHOT_FRONTIER: Config<bool> = Config::new(
239 "storage_sink_snapshot_frontier",
240 true,
241 "If true, skip fetching the snapshot in the sink once the frontier has advanced.",
242);
243
244pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
247 "storage_reclock_to_latest",
248 false,
249 "Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset.",
250);
251
252pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
254 "storage_use_continual_feedback_upsert",
255 true,
256 "Whether to use the new continual feedback upsert operator.",
257);
258
259pub const STORAGE_SERVER_MAINTENANCE_INTERVAL: Config<Duration> = Config::new(
261 "storage_server_maintenance_interval",
262 Duration::from_millis(10),
263 "The interval at which the storage server performs maintenance tasks. Zero enables maintenance on every iteration.",
264);
265
266pub const SINK_PROGRESS_SEARCH: Config<bool> = Config::new(
268 "storage_sink_progress_search",
269 true,
270 "If set, iteratively search the progress topic for a progress record with increasing lookback.",
271);
272
273pub const SINK_ENSURE_TOPIC_CONFIG: Config<&'static str> = Config::new(
275 "storage_sink_ensure_topic_config",
276 "skip",
277 "If `skip`, don't check the config of existing topics; if `check`, fetch the config and \
278 warn if it does not match the expected configs; if `alter`, attempt to change the upstream to \
279 match the expected configs.",
280);
281
282pub const ORE_OVERFLOWING_BEHAVIOR: Config<&'static str> = Config::new(
284 "ore_overflowing_behavior",
285 "ignore",
286 "Overflow behavior for Overflowing types. One of 'ignore', 'panic', 'soft_panic'.",
287);
288
289pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
291 configs
292 .add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
293 .add(&DELAY_SOURCES_PAST_REHYDRATION)
294 .add(&ENFORCE_EXTERNAL_ADDRESSES)
295 .add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
296 .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
297 .add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
298 .add(&KAFKA_METADATA_FETCH_INTERVAL)
299 .add(&KAFKA_POLL_MAX_WAIT)
300 .add(&MYSQL_OFFSET_KNOWN_INTERVAL)
301 .add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
302 .add(&ORE_OVERFLOWING_BEHAVIOR)
303 .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
304 .add(&PG_OFFSET_KNOWN_INTERVAL)
305 .add(&PG_SCHEMA_VALIDATION_INTERVAL)
306 .add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL)
307 .add(&SINK_ENSURE_TOPIC_CONFIG)
308 .add(&SINK_PROGRESS_SEARCH)
309 .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
310 .add(&STORAGE_RECLOCK_TO_LATEST)
311 .add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
312 .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
313 .add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)
314 .add(&STORAGE_SINK_SNAPSHOT_FRONTIER)
315 .add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
316 .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
317 .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
318 .add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
319 .add(&SUSPENDABLE_SOURCES)
320 .add(&WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL)
321 .add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL)
322 .add(&crate::sources::sql_server::CDC_POLL_INTERVAL)
323 .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE)
324 .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES)
325 .add(&crate::sources::sql_server::SNAPSHOT_MAX_LSN_WAIT)
326 .add(&crate::sources::sql_server::SNAPSHOT_PROGRESS_REPORT_INTERVAL)
327 .add(&crate::sources::sql_server::OFFSET_KNOWN_INTERVAL)
328}