1use mz_dyncfg::{Config, ConfigSet};
14use std::time::Duration;
15
16pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(
20 "storage_cluster_shutdown_grace_period",
21 Duration::from_secs(10 * 60),
22 "When dataflows observe an invariant violation it is either due to a bug or due to \
23 the cluster being shut down. This configuration defines the amount of time to \
24 wait before panicking the process, which will register the invariant violation.",
25);
26
27pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
32 "storage_dataflow_delay_sources_past_rehydration",
33 true,
35 "Whether or not to delay sources producing values in some scenarios \
36 (namely, upsert) till after rehydration is finished",
37);
38
39pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
42 "storage_dataflow_suspendable_sources",
43 true,
44 "Whether storage dataflows should suspend execution while downstream operators are still \
45 processing data.",
46);
47
48pub const STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION: Config<bool> = Config::new(
53 "storage_downgrade_since_during_finalization",
54 true,
56 "When enabled, force-downgrade the controller's since handle on the shard\
57 during shard finalization",
58);
59
60pub const REPLICA_METRICS_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
62 "replica_metrics_history_retention_interval",
63 Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the replica metrics history.",
65);
66
67pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
69 "wallclock_lag_history_retention_interval",
70 Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the wallclock lag history.",
72);
73
74pub const WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL: Config<Duration> = Config::new(
76 "wallclock_global_lag_histogram_retention_interval",
77 Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the wallclock lag histogram.",
79);
80
81pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config<fn() -> serde_json::Value> = Config::new(
94 "kafka_client_id_enrichment_rules",
95 || serde_json::json!([]),
96 "Rules for enriching the `client.id` property of Kafka clients with additional data.",
97);
98
99pub const KAFKA_POLL_MAX_WAIT: Config<Duration> = Config::new(
102 "kafka_poll_max_wait",
103 Duration::from_secs(1),
104 "The maximum time we will wait before re-polling rdkafka to see if new partitions/data are \
105 available.",
106);
107
108pub const KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM: Config<&'static str> =
109 Config::new(
110 "kafka_default_aws_privatelink_endpoint_identification_algorithm",
111 "none",
113 "The value we set for the 'ssl.endpoint.identification.algorithm' option in the Kafka \
114 Connection config. default: 'none'",
115 );
116
117pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config::new(
118 "kafka_buffered_event_resize_threshold_elements",
119 1000,
120 "In the Kafka sink operator we might need to buffer messages before emitting them. As a \
121 performance optimization we reuse the buffer allocations, but shrink it to retain at \
122 most this number of elements.",
123);
124
125pub const KAFKA_RETRY_BACKOFF: Config<Duration> = Config::new(
128 "kafka_retry_backoff",
129 Duration::from_millis(100),
130 "Sets retry.backoff.ms in librdkafka for sources and sinks.",
131);
132
133pub const KAFKA_RETRY_BACKOFF_MAX: Config<Duration> = Config::new(
136 "kafka_retry_backoff_max",
137 Duration::from_secs(1),
138 "Sets retry.backoff.max.ms in librdkafka for sources and sinks.",
139);
140
141pub const KAFKA_RECONNECT_BACKOFF: Config<Duration> = Config::new(
144 "kafka_reconnect_backoff",
145 Duration::from_millis(100),
146 "Sets reconnect.backoff.ms in librdkafka for sources and sinks.",
147);
148
149pub const KAFKA_RECONNECT_BACKOFF_MAX: Config<Duration> = Config::new(
154 "kafka_reconnect_backoff_max",
155 Duration::from_secs(30),
156 "Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.",
157);
158
159pub const KAFKA_SINK_MESSAGE_MAX_BYTES: Config<usize> = Config::new(
165 "kafka_sink_message_max_bytes",
166 1_000_000,
167 "Sets message.max.bytes in librdkafka for Kafka sink producers.",
168);
169
170pub const KAFKA_SINK_BATCH_SIZE: Config<usize> = Config::new(
176 "kafka_sink_batch_size",
177 1_000_000,
178 "Sets batch.size in librdkafka for Kafka sink producers.",
179);
180
181pub const KAFKA_SINK_BATCH_NUM_MESSAGES: Config<usize> = Config::new(
186 "kafka_sink_batch_num_messages",
187 10_000,
188 "Sets batch.num.messages in librdkafka for Kafka sink producers.",
189);
190
191pub const MYSQL_REPLICATION_HEARTBEAT_INTERVAL: Config<Duration> = Config::new(
195 "mysql_replication_heartbeat_interval",
196 Duration::from_secs(30),
197 "Replication heartbeat interval requested from the MySQL server.",
198);
199
200pub const PG_FETCH_SLOT_RESUME_LSN_INTERVAL: Config<Duration> = Config::new(
204 "postgres_fetch_slot_resume_lsn_interval",
205 Duration::from_millis(500),
206 "Interval to poll `confirmed_flush_lsn` to get a resumption lsn.",
207);
208
209pub const PG_SCHEMA_VALIDATION_INTERVAL: Config<Duration> = Config::new(
211 "pg_schema_validation_interval",
212 Duration::from_secs(15),
213 "Interval to re-validate the schemas of ingested tables.",
214);
215
216pub static PG_SOURCE_VALIDATE_TIMELINE: Config<bool> = Config::new(
222 "pg_source_validate_timeline",
223 true,
224 "Whether to treat a timeline switch as a definite error",
225);
226
227pub static SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY: Config<bool> = Config::new(
233 "sql_server_source_validate_restore_history",
234 true,
235 "Whether to treat a restore history change as a definite error",
236);
237
238pub const ENFORCE_EXTERNAL_ADDRESSES: Config<bool> = Config::new(
243 "storage_enforce_external_addresses",
244 false,
245 "Whether or not to enforce that external connection addresses are global \
246 (not private or local) when resolving them",
247);
248
249pub const STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING: Config<bool> = Config::new(
266 "storage_upsert_prevent_snapshot_buffering",
267 true,
268 "Prevent snapshot buffering in upsert.",
269);
270
271pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
273 "storage_rocksdb_use_merge_operator",
274 true,
275 "Use the native rocksdb merge operator where possible.",
276);
277
278pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config<Option<usize>> = Config::new(
283 "storage_upsert_max_snapshot_batch_buffering",
284 None,
285 "Limit snapshot buffering in upsert.",
286);
287
288pub const STORAGE_ROCKSDB_CLEANUP_TRIES: Config<usize> = Config::new(
292 "storage_rocksdb_cleanup_tries",
293 5,
294 "How many times to try to cleanup old RocksDB DB's on disk before giving up.",
295);
296
297pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
299 "storage_suspend_and_restart_delay",
300 Duration::from_secs(5),
301 "Delay interval when reconnecting to a source / sink after halt.",
302);
303
304pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
306 "storage_use_continual_feedback_upsert",
307 true,
308 "Whether to use the new continual feedback upsert operator.",
309);
310
311pub const ENABLE_UPSERT_V2: Config<bool> = Config::new(
313 "enable_upsert_v2",
314 false,
315 "Whether to use the v2 upsert operator.",
316);
317
318pub const STORAGE_SERVER_MAINTENANCE_INTERVAL: Config<Duration> = Config::new(
320 "storage_server_maintenance_interval",
321 Duration::from_millis(10),
322 "The interval at which the storage server performs maintenance tasks. Zero enables maintenance on every iteration.",
323);
324
325pub const SINK_PROGRESS_SEARCH: Config<bool> = Config::new(
327 "storage_sink_progress_search",
328 true,
329 "If set, iteratively search the progress topic for a progress record with increasing lookback.",
330);
331
332pub const SINK_ENSURE_TOPIC_CONFIG: Config<&'static str> = Config::new(
334 "storage_sink_ensure_topic_config",
335 "skip",
336 "If `skip`, don't check the config of existing topics; if `check`, fetch the config and \
337 warn if it does not match the expected configs; if `alter`, attempt to change the upstream to \
338 match the expected configs.",
339);
340
341pub const ORE_OVERFLOWING_BEHAVIOR: Config<&'static str> = Config::new(
343 "ore_overflowing_behavior",
344 "soft_panic",
345 "Overflow behavior for Overflowing types. One of 'ignore', 'panic', 'soft_panic'.",
346);
347
348pub const STATISTICS_RETENTION_DURATION: Config<Duration> = Config::new(
354 "storage_statistics_retention_duration",
355 Duration::from_secs(86_400), "The time after which we delete per replica statistics (for sources and sinks) after there have been no updates.",
357);
358
359pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
361 configs
362 .add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
363 .add(&DELAY_SOURCES_PAST_REHYDRATION)
364 .add(&ENFORCE_EXTERNAL_ADDRESSES)
365 .add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
366 .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
367 .add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
368 .add(&KAFKA_POLL_MAX_WAIT)
369 .add(&KAFKA_RETRY_BACKOFF)
370 .add(&KAFKA_RETRY_BACKOFF_MAX)
371 .add(&KAFKA_RECONNECT_BACKOFF)
372 .add(&KAFKA_RECONNECT_BACKOFF_MAX)
373 .add(&KAFKA_SINK_MESSAGE_MAX_BYTES)
374 .add(&KAFKA_SINK_BATCH_SIZE)
375 .add(&KAFKA_SINK_BATCH_NUM_MESSAGES)
376 .add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
377 .add(&ORE_OVERFLOWING_BEHAVIOR)
378 .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
379 .add(&PG_SCHEMA_VALIDATION_INTERVAL)
380 .add(&PG_SOURCE_VALIDATE_TIMELINE)
381 .add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL)
382 .add(&SINK_ENSURE_TOPIC_CONFIG)
383 .add(&SINK_PROGRESS_SEARCH)
384 .add(&SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY)
385 .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
386 .add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
387 .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
388 .add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)
389 .add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
390 .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
391 .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
392 .add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
393 .add(&ENABLE_UPSERT_V2)
394 .add(&SUSPENDABLE_SOURCES)
395 .add(&WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL)
396 .add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL)
397 .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE)
398 .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES)
399 .add(&crate::sources::sql_server::MAX_LSN_WAIT)
400 .add(&crate::sources::sql_server::SNAPSHOT_PROGRESS_REPORT_INTERVAL)
401 .add(&STATISTICS_RETENTION_DURATION)
402}