use std::time::Duration;
use mz_ore::cast::CastFrom;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_service::params::GrpcClientParameters;
use mz_ssh_util::tunnel::SshTimeoutConfig;
use mz_tracing::params::TracingParameters;
use serde::{Deserialize, Serialize};
include!(concat!(env!("OUT_DIR"), "/mz_storage_types.parameters.rs"));
pub const DEFAULT_PG_SOURCE_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL: Duration = Duration::from_secs(10);
pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE: Duration = Duration::from_secs(10);
pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES: u32 = 5;
pub const DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT: Duration = Duration::from_secs(40);
pub const DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER: bool = false;
pub const DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT: Option<Duration> = None;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StorageParameters {
pub pg_source_connect_timeout: Option<Duration>,
pub pg_source_tcp_keepalives_retries: Option<u32>,
pub pg_source_tcp_keepalives_idle: Option<Duration>,
pub pg_source_tcp_keepalives_interval: Option<Duration>,
pub pg_source_tcp_user_timeout: Option<Duration>,
pub pg_source_tcp_configure_server: bool,
pub pg_source_snapshot_statement_timeout: Duration,
pub pg_source_wal_sender_timeout: Option<Duration>,
pub mysql_source_timeouts: mz_mysql_util::TimeoutConfig,
pub keep_n_source_status_history_entries: usize,
pub keep_n_sink_status_history_entries: usize,
pub keep_n_privatelink_status_history_entries: usize,
pub replica_status_history_retention_window: Duration,
pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters,
pub finalize_shards: bool,
pub tracing: TracingParameters,
pub upsert_auto_spill_config: UpsertAutoSpillConfig,
pub storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig,
pub grpc_client: GrpcClientParameters,
pub shrink_upsert_unused_buffers_by_ratio: usize,
pub record_namespaced_errors: bool,
pub ssh_timeout_config: SshTimeoutConfig,
pub kafka_timeout_config: mz_kafka_util::client::TimeoutConfig,
pub statistics_interval: Duration,
pub statistics_collection_interval: Duration,
pub pg_snapshot_config: PgSourceSnapshotConfig,
pub user_storage_managed_collections_batch_duration: Duration,
pub dyncfg_updates: mz_dyncfg::ConfigUpdates,
}
pub const STATISTICS_INTERVAL_DEFAULT: Duration = Duration::from_secs(60);
pub const STATISTICS_COLLECTION_INTERVAL_DEFAULT: Duration = Duration::from_secs(10);
pub const STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT: Duration = Duration::from_secs(1);
pub const REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT: Duration =
Duration::from_secs(30 * 24 * 60 * 60); impl Default for StorageParameters {
fn default() -> Self {
Self {
pg_source_connect_timeout: Some(DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
pg_source_tcp_keepalives_retries: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
pg_source_tcp_keepalives_idle: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
pg_source_tcp_keepalives_interval: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
pg_source_tcp_user_timeout: Some(DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
pg_source_snapshot_statement_timeout:
mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT,
pg_source_wal_sender_timeout: DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT,
pg_source_tcp_configure_server: DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
mysql_source_timeouts: Default::default(),
keep_n_source_status_history_entries: Default::default(),
keep_n_sink_status_history_entries: Default::default(),
keep_n_privatelink_status_history_entries: Default::default(),
replica_status_history_retention_window:
REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT,
upsert_rocksdb_tuning_config: Default::default(),
finalize_shards: Default::default(),
tracing: Default::default(),
upsert_auto_spill_config: Default::default(),
storage_dataflow_max_inflight_bytes_config: Default::default(),
grpc_client: Default::default(),
shrink_upsert_unused_buffers_by_ratio: Default::default(),
record_namespaced_errors: true,
ssh_timeout_config: Default::default(),
kafka_timeout_config: Default::default(),
statistics_interval: STATISTICS_INTERVAL_DEFAULT,
statistics_collection_interval: STATISTICS_COLLECTION_INTERVAL_DEFAULT,
pg_snapshot_config: Default::default(),
user_storage_managed_collections_batch_duration:
STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
dyncfg_updates: Default::default(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StorageMaxInflightBytesConfig {
pub max_inflight_bytes_default: Option<usize>,
pub max_inflight_bytes_cluster_size_fraction: Option<f64>,
pub disk_only: bool,
}
impl Default for StorageMaxInflightBytesConfig {
fn default() -> Self {
Self {
max_inflight_bytes_default: Default::default(),
max_inflight_bytes_cluster_size_fraction: Default::default(),
disk_only: true,
}
}
}
impl RustType<ProtoStorageMaxInflightBytesConfig> for StorageMaxInflightBytesConfig {
fn into_proto(&self) -> ProtoStorageMaxInflightBytesConfig {
ProtoStorageMaxInflightBytesConfig {
max_in_flight_bytes_default: self.max_inflight_bytes_default.map(u64::cast_from),
max_in_flight_bytes_cluster_size_fraction: self
.max_inflight_bytes_cluster_size_fraction,
disk_only: self.disk_only,
}
}
fn from_proto(proto: ProtoStorageMaxInflightBytesConfig) -> Result<Self, TryFromProtoError> {
Ok(Self {
max_inflight_bytes_default: proto.max_in_flight_bytes_default.map(usize::cast_from),
max_inflight_bytes_cluster_size_fraction: proto
.max_in_flight_bytes_cluster_size_fraction,
disk_only: proto.disk_only,
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct UpsertAutoSpillConfig {
pub allow_spilling_to_disk: bool,
pub spill_to_disk_threshold_bytes: usize,
}
impl RustType<ProtoUpsertAutoSpillConfig> for UpsertAutoSpillConfig {
fn into_proto(&self) -> ProtoUpsertAutoSpillConfig {
ProtoUpsertAutoSpillConfig {
allow_spilling_to_disk: self.allow_spilling_to_disk,
spill_to_disk_threshold_bytes: u64::cast_from(self.spill_to_disk_threshold_bytes),
}
}
fn from_proto(proto: ProtoUpsertAutoSpillConfig) -> Result<Self, TryFromProtoError> {
Ok(Self {
allow_spilling_to_disk: proto.allow_spilling_to_disk,
spill_to_disk_threshold_bytes: usize::cast_from(proto.spill_to_disk_threshold_bytes),
})
}
}
impl StorageParameters {
pub fn update(
&mut self,
StorageParameters {
pg_source_connect_timeout,
pg_source_tcp_keepalives_retries,
pg_source_tcp_keepalives_idle,
pg_source_tcp_keepalives_interval,
pg_source_tcp_user_timeout,
pg_source_tcp_configure_server,
pg_source_snapshot_statement_timeout,
pg_source_wal_sender_timeout,
mysql_source_timeouts,
keep_n_source_status_history_entries,
keep_n_sink_status_history_entries,
keep_n_privatelink_status_history_entries,
replica_status_history_retention_window,
upsert_rocksdb_tuning_config,
finalize_shards,
tracing,
upsert_auto_spill_config,
storage_dataflow_max_inflight_bytes_config,
grpc_client,
shrink_upsert_unused_buffers_by_ratio,
record_namespaced_errors,
ssh_timeout_config,
kafka_timeout_config,
statistics_interval,
statistics_collection_interval,
pg_snapshot_config,
user_storage_managed_collections_batch_duration,
dyncfg_updates,
}: StorageParameters,
) {
self.pg_source_connect_timeout = pg_source_connect_timeout;
self.pg_source_tcp_keepalives_retries = pg_source_tcp_keepalives_retries;
self.pg_source_tcp_keepalives_idle = pg_source_tcp_keepalives_idle;
self.pg_source_tcp_keepalives_interval = pg_source_tcp_keepalives_interval;
self.pg_source_tcp_user_timeout = pg_source_tcp_user_timeout;
self.pg_source_tcp_configure_server = pg_source_tcp_configure_server;
self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout;
self.pg_source_wal_sender_timeout = pg_source_wal_sender_timeout;
self.mysql_source_timeouts = mysql_source_timeouts;
self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries;
self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries;
self.replica_status_history_retention_window = replica_status_history_retention_window;
self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
self.finalize_shards = finalize_shards;
self.tracing.update(tracing);
self.finalize_shards = finalize_shards;
self.upsert_auto_spill_config = upsert_auto_spill_config;
self.storage_dataflow_max_inflight_bytes_config =
storage_dataflow_max_inflight_bytes_config;
self.grpc_client.update(grpc_client);
self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio;
self.record_namespaced_errors = record_namespaced_errors;
self.ssh_timeout_config = ssh_timeout_config;
self.kafka_timeout_config = kafka_timeout_config;
self.statistics_interval = statistics_interval;
self.statistics_collection_interval = statistics_collection_interval;
self.pg_snapshot_config = pg_snapshot_config;
self.user_storage_managed_collections_batch_duration =
user_storage_managed_collections_batch_duration;
self.dyncfg_updates.extend(dyncfg_updates);
}
pub fn all_unset(&self) -> bool {
*self == Self::default()
}
}
impl RustType<ProtoStorageParameters> for StorageParameters {
fn into_proto(&self) -> ProtoStorageParameters {
ProtoStorageParameters {
pg_source_connect_timeout: self.pg_source_connect_timeout.into_proto(),
pg_source_tcp_keepalives_retries: self.pg_source_tcp_keepalives_retries,
pg_source_tcp_keepalives_idle: self.pg_source_tcp_keepalives_idle.into_proto(),
pg_source_tcp_keepalives_interval: self.pg_source_tcp_keepalives_interval.into_proto(),
pg_source_tcp_user_timeout: self.pg_source_tcp_user_timeout.into_proto(),
pg_source_snapshot_statement_timeout: Some(
self.pg_source_snapshot_statement_timeout.into_proto(),
),
pg_source_tcp_configure_server: self.pg_source_tcp_configure_server,
pg_source_wal_sender_timeout: self.pg_source_wal_sender_timeout.into_proto(),
mysql_source_timeouts: Some(self.mysql_source_timeouts.into_proto()),
keep_n_source_status_history_entries: u64::cast_from(
self.keep_n_source_status_history_entries,
),
keep_n_sink_status_history_entries: u64::cast_from(
self.keep_n_sink_status_history_entries,
),
keep_n_privatelink_status_history_entries: u64::cast_from(
self.keep_n_privatelink_status_history_entries,
),
replica_status_history_retention_window: Some(
self.replica_status_history_retention_window.into_proto(),
),
upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()),
finalize_shards: self.finalize_shards,
tracing: Some(self.tracing.into_proto()),
upsert_auto_spill_config: Some(self.upsert_auto_spill_config.into_proto()),
storage_dataflow_max_inflight_bytes_config: Some(
self.storage_dataflow_max_inflight_bytes_config.into_proto(),
),
grpc_client: Some(self.grpc_client.into_proto()),
shrink_upsert_unused_buffers_by_ratio: u64::cast_from(
self.shrink_upsert_unused_buffers_by_ratio,
),
record_namespaced_errors: self.record_namespaced_errors,
ssh_timeout_config: Some(self.ssh_timeout_config.into_proto()),
kafka_timeout_config: Some(self.kafka_timeout_config.into_proto()),
statistics_interval: Some(self.statistics_interval.into_proto()),
statistics_collection_interval: Some(self.statistics_collection_interval.into_proto()),
pg_snapshot_config: Some(self.pg_snapshot_config.into_proto()),
user_storage_managed_collections_batch_duration: Some(
self.user_storage_managed_collections_batch_duration
.into_proto(),
),
dyncfg_updates: Some(self.dyncfg_updates.clone()),
}
}
fn from_proto(proto: ProtoStorageParameters) -> Result<Self, TryFromProtoError> {
Ok(Self {
pg_source_connect_timeout: proto.pg_source_connect_timeout.into_rust()?,
pg_source_tcp_keepalives_retries: proto.pg_source_tcp_keepalives_retries,
pg_source_tcp_keepalives_idle: proto.pg_source_tcp_keepalives_idle.into_rust()?,
pg_source_tcp_keepalives_interval: proto
.pg_source_tcp_keepalives_interval
.into_rust()?,
pg_source_tcp_user_timeout: proto.pg_source_tcp_user_timeout.into_rust()?,
pg_source_tcp_configure_server: proto.pg_source_tcp_configure_server,
pg_source_snapshot_statement_timeout: proto
.pg_source_snapshot_statement_timeout
.into_rust_if_some(
"ProtoStorageParameters::pg_source_snapshot_statement_timeout",
)?,
pg_source_wal_sender_timeout: proto.pg_source_wal_sender_timeout.into_rust()?,
mysql_source_timeouts: proto
.mysql_source_timeouts
.into_rust_if_some("ProtoStorageParameters::mysql_source_timeouts")?,
keep_n_source_status_history_entries: usize::cast_from(
proto.keep_n_source_status_history_entries,
),
keep_n_sink_status_history_entries: usize::cast_from(
proto.keep_n_sink_status_history_entries,
),
keep_n_privatelink_status_history_entries: usize::cast_from(
proto.keep_n_privatelink_status_history_entries,
),
replica_status_history_retention_window: proto
.replica_status_history_retention_window
.into_rust_if_some(
"ProtoStorageParameters::replica_status_history_retention_window",
)?,
upsert_rocksdb_tuning_config: proto
.upsert_rocksdb_tuning_config
.into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?,
finalize_shards: proto.finalize_shards,
tracing: proto
.tracing
.into_rust_if_some("ProtoStorageParameters::tracing")?,
upsert_auto_spill_config: proto
.upsert_auto_spill_config
.into_rust_if_some("ProtoStorageParameters::upsert_auto_spill_config")?,
storage_dataflow_max_inflight_bytes_config: proto
.storage_dataflow_max_inflight_bytes_config
.into_rust_if_some(
"ProtoStorageParameters::storage_dataflow_max_inflight_bytes_config",
)?,
grpc_client: proto
.grpc_client
.into_rust_if_some("ProtoStorageParameters::grpc_client")?,
shrink_upsert_unused_buffers_by_ratio: usize::cast_from(
proto.shrink_upsert_unused_buffers_by_ratio,
),
record_namespaced_errors: proto.record_namespaced_errors,
ssh_timeout_config: proto
.ssh_timeout_config
.into_rust_if_some("ProtoStorageParameters::ssh_timeout_config")?,
kafka_timeout_config: proto
.kafka_timeout_config
.into_rust_if_some("ProtoStorageParameters::kafka_timeout_config")?,
statistics_interval: proto
.statistics_interval
.into_rust_if_some("ProtoStorageParameters::statistics_interval")?,
statistics_collection_interval: proto
.statistics_collection_interval
.into_rust_if_some("ProtoStorageParameters::statistics_collection_interval")?,
pg_snapshot_config: proto
.pg_snapshot_config
.into_rust_if_some("ProtoStorageParameters::pg_snapshot_config")?,
user_storage_managed_collections_batch_duration: proto
.user_storage_managed_collections_batch_duration
.into_rust_if_some(
"ProtoStorageParameters::user_storage_managed_collections_batch_duration",
)?,
dyncfg_updates: proto.dyncfg_updates.ok_or_else(|| {
TryFromProtoError::missing_field("ProtoStorageParameters::dyncfg_updates")
})?,
})
}
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PgSourceSnapshotConfig {
pub collect_strict_count: bool,
pub fallback_to_strict_count: bool,
pub wait_for_count: bool,
}
impl PgSourceSnapshotConfig {
pub const fn new() -> Self {
PgSourceSnapshotConfig {
collect_strict_count: true,
fallback_to_strict_count: true,
wait_for_count: true,
}
}
}
impl Default for PgSourceSnapshotConfig {
fn default() -> Self {
Self::new()
}
}
impl RustType<ProtoPgSourceSnapshotConfig> for PgSourceSnapshotConfig {
fn into_proto(&self) -> ProtoPgSourceSnapshotConfig {
ProtoPgSourceSnapshotConfig {
collect_strict_count: self.collect_strict_count,
fallback_to_strict_count: self.fallback_to_strict_count,
wait_for_count: self.wait_for_count,
}
}
fn from_proto(proto: ProtoPgSourceSnapshotConfig) -> Result<Self, TryFromProtoError> {
Ok(PgSourceSnapshotConfig {
collect_strict_count: proto.collect_strict_count,
fallback_to_strict_count: proto.fallback_to_strict_count,
wait_for_count: proto.wait_for_count,
})
}
}
impl RustType<ProtoKafkaTimeouts> for mz_kafka_util::client::TimeoutConfig {
fn into_proto(&self) -> ProtoKafkaTimeouts {
ProtoKafkaTimeouts {
keepalive: self.keepalive,
socket_timeout: Some(self.socket_timeout.into_proto()),
transaction_timeout: Some(self.transaction_timeout.into_proto()),
socket_connection_setup_timeout: Some(
self.socket_connection_setup_timeout.into_proto(),
),
fetch_metadata_timeout: Some(self.fetch_metadata_timeout.into_proto()),
progress_record_fetch_timeout: Some(self.progress_record_fetch_timeout.into_proto()),
default_metadata_fetch_interval: Some(
self.default_metadata_fetch_interval.into_proto(),
),
}
}
fn from_proto(proto: ProtoKafkaTimeouts) -> Result<Self, TryFromProtoError> {
Ok(mz_kafka_util::client::TimeoutConfig {
keepalive: proto.keepalive,
socket_timeout: proto
.socket_timeout
.into_rust_if_some("ProtoKafkaSourceTcpTimeouts::socket_timeout")?,
transaction_timeout: proto
.transaction_timeout
.into_rust_if_some("ProtoKafkaSourceTcpTimeouts::transaction_timeout")?,
socket_connection_setup_timeout: proto
.socket_connection_setup_timeout
.into_rust_if_some(
"ProtoKafkaSourceTcpTimeouts::socket_connection_setup_timeout",
)?,
fetch_metadata_timeout: proto
.fetch_metadata_timeout
.into_rust_if_some("ProtoKafkaSourceTcpTimeouts::fetch_metadata_timeout")?,
progress_record_fetch_timeout: proto
.progress_record_fetch_timeout
.into_rust_if_some("ProtoKafkaSourceTcpTimeouts::progress_record_fetch_timeout")?,
default_metadata_fetch_interval: proto
.default_metadata_fetch_interval
.into_rust_if_some(
"ProtoKafkaSourceTcpTimeouts::default_metadata_fetch_interval",
)?,
})
}
}
impl RustType<ProtoMySqlSourceTimeouts> for mz_mysql_util::TimeoutConfig {
fn into_proto(&self) -> ProtoMySqlSourceTimeouts {
ProtoMySqlSourceTimeouts {
tcp_keepalive: self.tcp_keepalive.into_proto(),
snapshot_max_execution_time: self.snapshot_max_execution_time.into_proto(),
snapshot_lock_wait_timeout: self.snapshot_lock_wait_timeout.into_proto(),
}
}
fn from_proto(proto: ProtoMySqlSourceTimeouts) -> Result<Self, TryFromProtoError> {
Ok(mz_mysql_util::TimeoutConfig {
tcp_keepalive: proto.tcp_keepalive.into_rust()?,
snapshot_max_execution_time: proto.snapshot_max_execution_time.into_rust()?,
snapshot_lock_wait_timeout: proto.snapshot_lock_wait_timeout.into_rust()?,
})
}
}
impl RustType<ProtoSshTimeoutConfig> for SshTimeoutConfig {
fn into_proto(&self) -> ProtoSshTimeoutConfig {
ProtoSshTimeoutConfig {
check_interval: Some(self.check_interval.into_proto()),
connect_timeout: Some(self.connect_timeout.into_proto()),
keepalives_idle: Some(self.keepalives_idle.into_proto()),
}
}
fn from_proto(proto: ProtoSshTimeoutConfig) -> Result<Self, TryFromProtoError> {
Ok(SshTimeoutConfig {
check_interval: proto
.check_interval
.into_rust_if_some("ProtoSshTimeoutConfig::check_interval")?,
connect_timeout: proto
.connect_timeout
.into_rust_if_some("ProtoSshTimeoutConfig::connect_timeout")?,
keepalives_idle: proto
.keepalives_idle
.into_rust_if_some("ProtoSshTimeoutConfig::keepalives_idle")?,
})
}
}