Skip to main content

mz_storage_types/
dyncfgs.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Dyncfgs used by the storage layer. Despite their name, these can be used
11//! "statically" during rendering, or dynamically within timely operators.
12
13use mz_dyncfg::{Config, ConfigSet};
14use std::time::Duration;
15
16/// When dataflows observe an invariant violation it is either due to a bug or due to the cluster
17/// being shut down. This configuration defines the amount of time to wait before panicking the
18/// process, which will register the invariant violation.
19pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(
20    "storage_cluster_shutdown_grace_period",
21    Duration::from_secs(10 * 60),
22    "When dataflows observe an invariant violation it is either due to a bug or due to \
23        the cluster being shut down. This configuration defines the amount of time to \
24        wait before panicking the process, which will register the invariant violation.",
25);
26
27// Flow control
28
29/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
30/// Configuration for basic hydration backpressure.
31pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
32    "storage_dataflow_delay_sources_past_rehydration",
33    // This was original `false`, but it is not enabled everywhere.
34    true,
35    "Whether or not to delay sources producing values in some scenarios \
36        (namely, upsert) till after rehydration is finished",
37);
38
39/// Whether storage dataflows should suspend execution while downstream operators are still
40/// processing data.
41pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
42    "storage_dataflow_suspendable_sources",
43    true,
44    "Whether storage dataflows should suspend execution while downstream operators are still \
45        processing data.",
46);
47
48// Controller
49
50/// When enabled, force-downgrade the controller's since handle on the shard
51/// during shard finalization.
52pub const STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION: Config<bool> = Config::new(
53    "storage_downgrade_since_during_finalization",
54    // This was original `false`, but it is not enabled everywhere.
55    true,
56    "When enabled, force-downgrade the controller's since handle on the shard\
57    during shard finalization",
58);
59
60/// The interval of time to keep when truncating the replica metrics history.
61pub const REPLICA_METRICS_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
62    "replica_metrics_history_retention_interval",
63    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
64    "The interval of time to keep when truncating the replica metrics history.",
65);
66
67/// The interval of time to keep when truncating the wallclock lag history.
68pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
69    "wallclock_lag_history_retention_interval",
70    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
71    "The interval of time to keep when truncating the wallclock lag history.",
72);
73
74/// The interval of time to keep when truncating the wallclock lag histogram.
75pub const WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL: Config<Duration> = Config::new(
76    "wallclock_global_lag_histogram_retention_interval",
77    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
78    "The interval of time to keep when truncating the wallclock lag histogram.",
79);
80
81// Kafka
82
83/// Rules for enriching the `client.id` property of Kafka clients with
84/// additional data.
85///
86/// The configuration value must be a JSON array of objects containing keys
87/// named `pattern` and `payload`, both of type string. Rules are checked in the
88/// order they are defined. The rule's pattern must be a regular expression
89/// understood by the Rust `regex` crate. If the rule's pattern matches the
90/// address of any broker in the connection, then the payload is appended to the
91/// client ID. A rule's payload is always prefixed with `-`, to separate it from
92/// the preceding data in the client ID.
93pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config<fn() -> serde_json::Value> = Config::new(
94    "kafka_client_id_enrichment_rules",
95    || serde_json::json!([]),
96    "Rules for enriching the `client.id` property of Kafka clients with additional data.",
97);
98
99/// The maximum time we will wait before re-polling rdkafka to see if new partitions/data are
100/// available.
101pub const KAFKA_POLL_MAX_WAIT: Config<Duration> = Config::new(
102    "kafka_poll_max_wait",
103    Duration::from_secs(1),
104    "The maximum time we will wait before re-polling rdkafka to see if new partitions/data are \
105    available.",
106);
107
108/// Whether to check the low watermark for Kafka sources and error if the start offset/resume
109/// upper has been compacted away.
110pub const KAFKA_LOW_WATERMARK_CHECK: Config<bool> = Config::new(
111    "kafka_low_watermark_check",
112    true,
113    "Whether to check the low watermark for Kafka sources and error if the start \
114    offset/resume upper has been compacted away.",
115);
116
117pub const KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM: Config<&'static str> =
118    Config::new(
119        "kafka_default_aws_privatelink_endpoint_identification_algorithm",
120        // Default to no hostname verification, which is the default in versions of `librdkafka <1.9.2`.
121        "none",
122        "The value we set for the 'ssl.endpoint.identification.algorithm' option in the Kafka \
123    Connection config. default: 'none'",
124    );
125
126pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config::new(
127    "kafka_buffered_event_resize_threshold_elements",
128    1000,
129    "In the Kafka sink operator we might need to buffer messages before emitting them. As a \
130        performance optimization we reuse the buffer allocations, but shrink it to retain at \
131        most this number of elements.",
132);
133
134/// Sets retry.backoff.ms in librdkafka for sources and sinks.
135/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
136pub const KAFKA_RETRY_BACKOFF: Config<Duration> = Config::new(
137    "kafka_retry_backoff",
138    Duration::from_millis(100),
139    "Sets retry.backoff.ms in librdkafka for sources and sinks.",
140);
141
142/// Sets retry.backoff.max.ms in librdkafka for sources and sinks.
143/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
144pub const KAFKA_RETRY_BACKOFF_MAX: Config<Duration> = Config::new(
145    "kafka_retry_backoff_max",
146    Duration::from_secs(1),
147    "Sets retry.backoff.max.ms in librdkafka for sources and sinks.",
148);
149
150/// Sets reconnect.backoff.ms in librdkafka for sources and sinks.
151/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
152pub const KAFKA_RECONNECT_BACKOFF: Config<Duration> = Config::new(
153    "kafka_reconnect_backoff",
154    Duration::from_millis(100),
155    "Sets reconnect.backoff.ms in librdkafka for sources and sinks.",
156);
157
158/// Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.
159/// We default to 30s instead of 10s to avoid constant reconnection attempts in the event of
160/// auth changes or unavailability.
161/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
162pub const KAFKA_RECONNECT_BACKOFF_MAX: Config<Duration> = Config::new(
163    "kafka_reconnect_backoff_max",
164    Duration::from_secs(30),
165    "Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.",
166);
167
168/// Sets message.max.bytes in librdkafka for Kafka sink producers.
169/// Maximum Kafka protocol request message size. Producer-side, this controls
170/// the maximum size of a single message (including framing) that the client
171/// will allow. Defaults to the librdkafka default of 1,000,000 bytes.
172/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
173pub const KAFKA_SINK_MESSAGE_MAX_BYTES: Config<usize> = Config::new(
174    "kafka_sink_message_max_bytes",
175    1_000_000,
176    "Sets message.max.bytes in librdkafka for Kafka sink producers.",
177);
178
179/// Sets batch.size in librdkafka for Kafka sink producers.
180/// Maximum size (in bytes) of all messages batched in one MessageSet, including
181/// protocol framing overhead. Defaults to the librdkafka default of 1,000,000
182/// bytes.
183/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
184pub const KAFKA_SINK_BATCH_SIZE: Config<usize> = Config::new(
185    "kafka_sink_batch_size",
186    1_000_000,
187    "Sets batch.size in librdkafka for Kafka sink producers.",
188);
189
190/// Sets batch.num.messages in librdkafka for Kafka sink producers.
191/// Maximum number of messages batched in one MessageSet. Defaults to the
192/// librdkafka default of 10,000 messages.
193/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
194pub const KAFKA_SINK_BATCH_NUM_MESSAGES: Config<usize> = Config::new(
195    "kafka_sink_batch_num_messages",
196    10_000,
197    "Sets batch.num.messages in librdkafka for Kafka sink producers.",
198);
199
200// MySQL
201
202/// Replication heartbeat interval requested from the MySQL server.
203pub const MYSQL_REPLICATION_HEARTBEAT_INTERVAL: Config<Duration> = Config::new(
204    "mysql_replication_heartbeat_interval",
205    Duration::from_secs(30),
206    "Replication heartbeat interval requested from the MySQL server.",
207);
208
209// Postgres
210
211/// Interval to poll `confirmed_flush_lsn` to get a resumption lsn.
212pub const PG_FETCH_SLOT_RESUME_LSN_INTERVAL: Config<Duration> = Config::new(
213    "postgres_fetch_slot_resume_lsn_interval",
214    Duration::from_millis(500),
215    "Interval to poll `confirmed_flush_lsn` to get a resumption lsn.",
216);
217
218/// Interval to re-validate the schemas of ingested tables.
219pub const PG_SCHEMA_VALIDATION_INTERVAL: Config<Duration> = Config::new(
220    "pg_schema_validation_interval",
221    Duration::from_secs(15),
222    "Interval to re-validate the schemas of ingested tables.",
223);
224
225/// Controls behavior of PG Source when the upstream DB timeline changes. The default behavior
226/// is to emit a definite error forcing source recreation. In cases of HA, the upstream DB may
227/// provide guarantees of failover without loss of data (e.g. CloudSQL maintenance). Changing this
228/// flag puts the onus on the customer to recreate the source if the upstream DB changes timeline
229/// in a way that introduces data loss (e.g. manual failover, restore, etc.).
230pub static PG_SOURCE_VALIDATE_TIMELINE: Config<bool> = Config::new(
231    "pg_source_validate_timeline",
232    true,
233    "Whether to treat a timeline switch as a definite error",
234);
235
236/// Controls behavior of the SQL Server source when the upstream DB restore history changes. The
237/// default behavior is to emit a definite error, forcing source recreation.  In cases of Always
238/// On Availability Group (AOAG), the upstream DB may guarantee continuity without loss of data.
239/// Changing this flag puts the onus on the customer to recreate the source if the upstream DB
240/// changes in a way that introduces data loss.
241pub static SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY: Config<bool> = Config::new(
242    "sql_server_source_validate_restore_history",
243    true,
244    "Whether to treat a restore history change as a definite error",
245);
246
247// Networking
248
249/// Whether or not to enforce that external connection addresses are global
250/// (not private or local) when resolving them.
251pub const ENFORCE_EXTERNAL_ADDRESSES: Config<bool> = Config::new(
252    "storage_enforce_external_addresses",
253    false,
254    "Whether or not to enforce that external connection addresses are global \
255          (not private or local) when resolving them",
256);
257
258// Upsert
259
260/// Whether or not to prevent buffering the entire _upstream_ snapshot in
261/// memory when processing it in memory. This is generally understood to reduce
262/// memory consumption.
263///
264/// When false, in general the memory utilization while processing the snapshot is:
265/// # of snapshot updates + (# of unique keys in snapshot * N), where N is some small
266/// integer number of buffers
267///
268/// When true, in general the memory utilization while processing the snapshot is:
269/// # of snapshot updates + (RocksDB buffers + # of keys in batch produced by upstream) * # of
270/// workers.
271///
272/// Without hydration flow control, which is not yet implemented, there are workloads that may
273/// cause the latter to use more memory, which is why we offer this configuration.
274pub const STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING: Config<bool> = Config::new(
275    "storage_upsert_prevent_snapshot_buffering",
276    true,
277    "Prevent snapshot buffering in upsert.",
278);
279
280/// Whether to enable the merge operator in upsert for the RocksDB backend.
281pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
282    "storage_rocksdb_use_merge_operator",
283    true,
284    "Use the native rocksdb merge operator where possible.",
285);
286
287/// If `storage_upsert_prevent_snapshot_buffering` is true, this prevents the upsert
288/// operator from buffering too many events from the upstream snapshot. In the absence
289/// of hydration flow control, this could prevent certain workloads from causing egregiously
290/// large writes to RocksDB.
291pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config<Option<usize>> = Config::new(
292    "storage_upsert_max_snapshot_batch_buffering",
293    None,
294    "Limit snapshot buffering in upsert.",
295);
296
297// RocksDB
298
299/// How many times to try to cleanup old RocksDB DB's on disk before giving up.
300pub const STORAGE_ROCKSDB_CLEANUP_TRIES: Config<usize> = Config::new(
301    "storage_rocksdb_cleanup_tries",
302    5,
303    "How many times to try to cleanup old RocksDB DB's on disk before giving up.",
304);
305
306/// Delay interval when reconnecting to a source / sink after halt.
307pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
308    "storage_suspend_and_restart_delay",
309    Duration::from_secs(5),
310    "Delay interval when reconnecting to a source / sink after halt.",
311);
312
313/// Whether to use the new continual feedback upsert operator.
314pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
315    "storage_use_continual_feedback_upsert",
316    true,
317    "Whether to use the new continual feedback upsert operator.",
318);
319
320/// Whether to use the v2 upsert operator.
321pub const ENABLE_UPSERT_V2: Config<bool> = Config::new(
322    "enable_upsert_v2",
323    false,
324    "Whether to use the v2 upsert operator.",
325);
326
327/// The interval at which the storage server performs maintenance tasks.
328pub const STORAGE_SERVER_MAINTENANCE_INTERVAL: Config<Duration> = Config::new(
329    "storage_server_maintenance_interval",
330    Duration::from_millis(10),
331    "The interval at which the storage server performs maintenance tasks. Zero enables maintenance on every iteration.",
332);
333
334/// If set, iteratively search the progress topic for a progress record with increasing lookback.
335pub const SINK_PROGRESS_SEARCH: Config<bool> = Config::new(
336    "storage_sink_progress_search",
337    true,
338    "If set, iteratively search the progress topic for a progress record with increasing lookback.",
339);
340
341/// Configure how to behave when trying to create an existing topic with specified configs.
342pub const SINK_ENSURE_TOPIC_CONFIG: Config<&'static str> = Config::new(
343    "storage_sink_ensure_topic_config",
344    "skip",
345    "If `skip`, don't check the config of existing topics; if `check`, fetch the config and \
346    warn if it does not match the expected configs; if `alter`, attempt to change the upstream to \
347    match the expected configs.",
348);
349
350/// Configure mz-ore overflowing type behavior.
351pub const ORE_OVERFLOWING_BEHAVIOR: Config<&'static str> = Config::new(
352    "ore_overflowing_behavior",
353    "soft_panic",
354    "Overflow behavior for Overflowing types. One of 'ignore', 'panic', 'soft_panic'.",
355);
356
357/// The time after which we delete per-replica statistics (for sources and
358/// sinks) after there have been no updates.
359///
360/// This time is opportunistic, statistics are not guaranteed to be deleted
361/// after the retention time runs out.
362pub const STATISTICS_RETENTION_DURATION: Config<Duration> = Config::new(
363    "storage_statistics_retention_duration",
364    Duration::from_secs(86_400), /* one day */
365    "The time after which we delete per replica statistics (for sources and sinks) after there have been no updates.",
366);
367
368/// Adds the full set of all storage `Config`s.
369pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
370    configs
371        .add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
372        .add(&DELAY_SOURCES_PAST_REHYDRATION)
373        .add(&ENFORCE_EXTERNAL_ADDRESSES)
374        .add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
375        .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
376        .add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
377        .add(&KAFKA_LOW_WATERMARK_CHECK)
378        .add(&KAFKA_POLL_MAX_WAIT)
379        .add(&KAFKA_RETRY_BACKOFF)
380        .add(&KAFKA_RETRY_BACKOFF_MAX)
381        .add(&KAFKA_RECONNECT_BACKOFF)
382        .add(&KAFKA_RECONNECT_BACKOFF_MAX)
383        .add(&KAFKA_SINK_MESSAGE_MAX_BYTES)
384        .add(&KAFKA_SINK_BATCH_SIZE)
385        .add(&KAFKA_SINK_BATCH_NUM_MESSAGES)
386        .add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
387        .add(&ORE_OVERFLOWING_BEHAVIOR)
388        .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
389        .add(&PG_SCHEMA_VALIDATION_INTERVAL)
390        .add(&PG_SOURCE_VALIDATE_TIMELINE)
391        .add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL)
392        .add(&SINK_ENSURE_TOPIC_CONFIG)
393        .add(&SINK_PROGRESS_SEARCH)
394        .add(&SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY)
395        .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
396        .add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
397        .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
398        .add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)
399        .add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
400        .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
401        .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
402        .add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
403        .add(&ENABLE_UPSERT_V2)
404        .add(&SUSPENDABLE_SOURCES)
405        .add(&WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL)
406        .add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL)
407        .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE)
408        .add(&crate::sources::sql_server::CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES)
409        .add(&crate::sources::sql_server::MAX_LSN_WAIT)
410        .add(&crate::sources::sql_server::SNAPSHOT_PROGRESS_REPORT_INTERVAL)
411        .add(&STATISTICS_RETENTION_DURATION)
412}