1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Dyncfgs used by the storage layer. Despite their name, these can be used
//! "statically" during rendering, or dynamically within timely operators.

use mz_dyncfg::{Config, ConfigSet};
use std::time::Duration;

/// When dataflows observe an invariant violation it is either due to a bug or due to the cluster
/// being shut down. This configuration defines the amount of time to wait before panicking the
/// process, which will register the invariant violation.
pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(
    "storage_cluster_shutdown_grace_period",
    Duration::from_secs(10 * 60),
    "When dataflows observe an invariant violation it is either due to a bug or due to \
        the cluster being shut down. This configuration defines the amount of time to \
        wait before panicking the process, which will register the invariant violation.",
);

// Flow control

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
/// Configuration for basic hydration backpressure.
pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
    "storage_dataflow_delay_sources_past_rehydration",
    // This was original `false`, but it is not enabled everywhere.
    true,
    "Whether or not to delay sources producing values in some scenarios \
        (namely, upsert) till after rehydration is finished",
);

/// Whether storage dataflows should suspend execution while downstream operators are still
/// processing data.
pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
    "storage_dataflow_suspendable_sources",
    true,
    "Whether storage dataflows should suspend execution while downstream operators are still \
        processing data.",
);

// Controller

/// When enabled, force-downgrade the controller's since handle on the shard
/// during shard finalization.
pub const STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION: Config<bool> = Config::new(
    "storage_downgrade_since_during_finalization",
    // This was original `false`, but it is not enabled everywhere.
    true,
    "When enabled, force-downgrade the controller's since handle on the shard\
    during shard finalization",
);

/// The interval of time to keep when truncating the replica metrics history.
pub const REPLICA_METRICS_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
    "replica_metrics_history_retention_interval",
    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
    "The interval of time to keep when truncating the replica metrics history.",
);

/// The interval of time to keep when truncating the wallclock lag history.
pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
    "wallclock_lag_history_retention_interval",
    Duration::from_secs(60 * 60 * 24 * 30), // 30 days
    "The interval of time to keep when truncating the wallclock lag history.",
);

// Kafka

/// Rules for enriching the `client.id` property of Kafka clients with
/// additional data.
///
/// The configuration value must be a JSON array of objects containing keys
/// named `pattern` and `payload`, both of type string. Rules are checked in the
/// order they are defined. The rule's pattern must be a regular expression
/// understood by the Rust `regex` crate. If the rule's pattern matches the
/// address of any broker in the connection, then the payload is appended to the
/// client ID. A rule's payload is always prefixed with `-`, to separate it from
/// the preceding data in the client ID.
pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config<fn() -> serde_json::Value> = Config::new(
    "kafka_client_id_enrichment_rules",
    || serde_json::json!([]),
    "Rules for enriching the `client.id` property of Kafka clients with additional data.",
);

/// The maximum time we will wait before re-polling rdkafka to see if new partitions/data are
/// available.
pub const KAFKA_POLL_MAX_WAIT: Config<Duration> = Config::new(
    "kafka_poll_max_wait",
    Duration::from_secs(1),
    "The maximum time we will wait before re-polling rdkafka to see if new partitions/data are \
    available.",
);

pub const KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM: Config<&'static str> =
    Config::new(
        "kafka_default_aws_privatelink_endpoint_identification_algorithm",
        // Default to no hostname verification, which is the default in versions of `librdkafka <1.9.2`.
        "none",
        "The value we set for the 'ssl.endpoint.identification.algorithm' option in the Kafka \
    Connection config. default: 'none'",
    );

pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config::new(
    "kafka_buffered_event_resize_threshold_elements",
    1000,
    "In the Kafka sink operator we might need to buffer messages before emitting them. As a \
        performance optimization we reuse the buffer allocations, but shrink it to retain at \
        most this number of elements.",
);

// MySQL

/// Replication heartbeat interval requested from the MySQL server.
pub const MYSQL_REPLICATION_HEARTBEAT_INTERVAL: Config<Duration> = Config::new(
    "mysql_replication_heartbeat_interval",
    Duration::from_secs(30),
    "Replication heartbeat interval requested from the MySQL server.",
);

/// Interval to fetch `offset_known`, from `@gtid_executed`
pub const MYSQL_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
    "mysql_offset_known_interval",
    Duration::from_secs(10),
    "Interval to fetch `offset_known`, from `@gtid_executed`",
);

// Postgres

/// Interval to poll `confirmed_flush_lsn` to get a resumption lsn.
pub const PG_FETCH_SLOT_RESUME_LSN_INTERVAL: Config<Duration> = Config::new(
    "postgres_fetch_slot_resume_lsn_interval",
    Duration::from_millis(500),
    "Interval to poll `confirmed_flush_lsn` to get a resumption lsn.",
);

/// Interval to fetch `offset_known`, from `pg_current_wal_lsn`
pub const PG_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
    "pg_offset_known_interval",
    Duration::from_secs(10),
    "Interval to fetch `offset_known`, from `pg_current_wal_lsn`",
);

// Networking

/// Whether or not to enforce that external connection addresses are global
/// (not private or local) when resolving them.
pub const ENFORCE_EXTERNAL_ADDRESSES: Config<bool> = Config::new(
    "storage_enforce_external_addresses",
    false,
    "Whether or not to enforce that external connection addresses are global \
          (not private or local) when resolving them",
);

// Upsert

/// Whether or not to prevent buffering the entire _upstream_ snapshot in
/// memory when processing it in memory. This is generally understood to reduce
/// memory consumption.
///
/// When false, in general the memory utilization while processing the snapshot is:
/// # of snapshot updates + (# of unique keys in snapshot * N), where N is some small
/// integer number of buffers
///
/// When true, in general the memory utilization while processing the snapshot is:
/// # of snapshot updates + (RocksDB buffers + # of keys in batch produced by upstream) * # of
/// workers.
///
/// Without hydration flow control, which is not yet implemented, there are workloads that may
/// cause the latter to use more memory, which is why we offer this configuration.
pub const STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING: Config<bool> = Config::new(
    "storage_upsert_prevent_snapshot_buffering",
    true,
    "Prevent snapshot buffering in upsert.",
);

/// Whether to enable the merge operator in upsert for the RocksDB backend.
pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
    "storage_rocksdb_use_merge_operator",
    true,
    "Use the native rocksdb merge operator where possible.",
);

/// If `storage_upsert_prevent_snapshot_buffering` is true, this prevents the upsert
/// operator from buffering too many events from the upstream snapshot. In the absence
/// of hydration flow control, this could prevent certain workloads from causing egregiously
/// large writes to RocksDB.
pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config<Option<usize>> = Config::new(
    "storage_upsert_max_snapshot_batch_buffering",
    None,
    "Limit snapshot buffering in upsert.",
);

// RocksDB

/// How many times to try to cleanup old RocksDB DB's on disk before giving up.
pub const STORAGE_ROCKSDB_CLEANUP_TRIES: Config<usize> = Config::new(
    "storage_rocksdb_cleanup_tries",
    5,
    "How many times to try to cleanup old RocksDB DB's on disk before giving up.",
);

/// Delay interval when reconnecting to a source / sink after halt.
pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
    "storage_suspend_and_restart_delay",
    Duration::from_secs(5),
    "Delay interval when reconnecting to a source / sink after halt.",
);

/// Whether to mint reclock bindings based on the latest probed frontier or the currently ingested
/// frontier.
pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
    "storage_reclock_to_latest",
    false,
    "Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset."
);

/// Whether to use the new continual feedback upsert operator.
pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
    "storage_use_continual_feedback_upsert",
    false,
    "Whether to use the new continual feedback upsert operator.",
);

/// Adds the full set of all storage `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
    configs
        .add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
        .add(&DELAY_SOURCES_PAST_REHYDRATION)
        .add(&SUSPENDABLE_SOURCES)
        .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
        .add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL)
        .add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL)
        .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
        .add(&KAFKA_POLL_MAX_WAIT)
        .add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
        .add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
        .add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
        .add(&MYSQL_OFFSET_KNOWN_INTERVAL)
        .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
        .add(&PG_OFFSET_KNOWN_INTERVAL)
        .add(&ENFORCE_EXTERNAL_ADDRESSES)
        .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
        .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
        .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
        .add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
        .add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
        .add(&STORAGE_RECLOCK_TO_LATEST)
        .add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
}