use mz_dyncfg::{Config, ConfigSet};
use std::time::Duration;
pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(
"storage_cluster_shutdown_grace_period",
Duration::from_secs(10 * 60),
"When dataflows observe an invariant violation it is either due to a bug or due to \
the cluster being shut down. This configuration defines the amount of time to \
wait before panicking the process, which will register the invariant violation.",
);
pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
"storage_dataflow_delay_sources_past_rehydration",
true,
"Whether or not to delay sources producing values in some scenarios \
(namely, upsert) till after rehydration is finished",
);
pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
"storage_dataflow_suspendable_sources",
true,
"Whether storage dataflows should suspend execution while downstream operators are still \
processing data.",
);
pub const STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION: Config<bool> = Config::new(
"storage_downgrade_since_during_finalization",
true,
"When enabled, force-downgrade the controller's since handle on the shard\
during shard finalization",
);
pub const REPLICA_METRICS_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
"replica_metrics_history_retention_interval",
Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the replica metrics history.",
);
pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config<Duration> = Config::new(
"wallclock_lag_history_retention_interval",
Duration::from_secs(60 * 60 * 24 * 30), "The interval of time to keep when truncating the wallclock lag history.",
);
pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config<fn() -> serde_json::Value> = Config::new(
"kafka_client_id_enrichment_rules",
|| serde_json::json!([]),
"Rules for enriching the `client.id` property of Kafka clients with additional data.",
);
pub const KAFKA_POLL_MAX_WAIT: Config<Duration> = Config::new(
"kafka_poll_max_wait",
Duration::from_secs(1),
"The maximum time we will wait before re-polling rdkafka to see if new partitions/data are \
available.",
);
pub const KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM: Config<&'static str> =
Config::new(
"kafka_default_aws_privatelink_endpoint_identification_algorithm",
"none",
"The value we set for the 'ssl.endpoint.identification.algorithm' option in the Kafka \
Connection config. default: 'none'",
);
pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config::new(
"kafka_buffered_event_resize_threshold_elements",
1000,
"In the Kafka sink operator we might need to buffer messages before emitting them. As a \
performance optimization we reuse the buffer allocations, but shrink it to retain at \
most this number of elements.",
);
pub const MYSQL_REPLICATION_HEARTBEAT_INTERVAL: Config<Duration> = Config::new(
"mysql_replication_heartbeat_interval",
Duration::from_secs(30),
"Replication heartbeat interval requested from the MySQL server.",
);
pub const MYSQL_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
"mysql_offset_known_interval",
Duration::from_secs(10),
"Interval to fetch `offset_known`, from `@gtid_executed`",
);
pub const PG_FETCH_SLOT_RESUME_LSN_INTERVAL: Config<Duration> = Config::new(
"postgres_fetch_slot_resume_lsn_interval",
Duration::from_millis(500),
"Interval to poll `confirmed_flush_lsn` to get a resumption lsn.",
);
pub const PG_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
"pg_offset_known_interval",
Duration::from_secs(10),
"Interval to fetch `offset_known`, from `pg_current_wal_lsn`",
);
pub const ENFORCE_EXTERNAL_ADDRESSES: Config<bool> = Config::new(
"storage_enforce_external_addresses",
false,
"Whether or not to enforce that external connection addresses are global \
(not private or local) when resolving them",
);
pub const STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING: Config<bool> = Config::new(
"storage_upsert_prevent_snapshot_buffering",
true,
"Prevent snapshot buffering in upsert.",
);
pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
"storage_rocksdb_use_merge_operator",
true,
"Use the native rocksdb merge operator where possible.",
);
pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config<Option<usize>> = Config::new(
"storage_upsert_max_snapshot_batch_buffering",
None,
"Limit snapshot buffering in upsert.",
);
pub const STORAGE_ROCKSDB_CLEANUP_TRIES: Config<usize> = Config::new(
"storage_rocksdb_cleanup_tries",
5,
"How many times to try to cleanup old RocksDB DB's on disk before giving up.",
);
pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
"storage_suspend_and_restart_delay",
Duration::from_secs(5),
"Delay interval when reconnecting to a source / sink after halt.",
);
pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
"storage_reclock_to_latest",
false,
"Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset."
);
pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
"storage_use_continual_feedback_upsert",
false,
"Whether to use the new continual feedback upsert operator.",
);
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
.add(&DELAY_SOURCES_PAST_REHYDRATION)
.add(&SUSPENDABLE_SOURCES)
.add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
.add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL)
.add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL)
.add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
.add(&KAFKA_POLL_MAX_WAIT)
.add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
.add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
.add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
.add(&MYSQL_OFFSET_KNOWN_INTERVAL)
.add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
.add(&PG_OFFSET_KNOWN_INTERVAL)
.add(&ENFORCE_EXTERNAL_ADDRESSES)
.add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
.add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
.add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
.add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
.add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
.add(&STORAGE_RECLOCK_TO_LATEST)
.add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
}