use mz_ore::cast::CastFrom;
use mz_persist_client::cfg::PersistParameters;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_service::params::GrpcClientParameters;
use mz_tracing::params::TracingParameters;
use serde::{Deserialize, Serialize};
include!(concat!(
env!("OUT_DIR"),
"/mz_storage_client.types.parameters.rs"
));
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
pub struct StorageParameters {
pub persist: PersistParameters,
pub pg_replication_timeouts: mz_postgres_util::ReplicationTimeouts,
pub keep_n_source_status_history_entries: usize,
pub keep_n_sink_status_history_entries: usize,
pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters,
pub finalize_shards: bool,
pub tracing: TracingParameters,
pub upsert_auto_spill_config: UpsertAutoSpillConfig,
pub storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig,
pub grpc_client: GrpcClientParameters,
pub delay_sources_past_rehydration: bool,
pub shrink_upsert_unused_buffers_by_ratio: usize,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StorageMaxInflightBytesConfig {
pub max_inflight_bytes_default: Option<usize>,
pub max_inflight_bytes_cluster_size_fraction: Option<f64>,
pub disk_only: bool,
}
impl Default for StorageMaxInflightBytesConfig {
fn default() -> Self {
Self {
max_inflight_bytes_default: Default::default(),
max_inflight_bytes_cluster_size_fraction: Default::default(),
disk_only: true,
}
}
}
impl RustType<ProtoStorageMaxInflightBytesConfig> for StorageMaxInflightBytesConfig {
fn into_proto(&self) -> ProtoStorageMaxInflightBytesConfig {
ProtoStorageMaxInflightBytesConfig {
max_in_flight_bytes_default: self.max_inflight_bytes_default.map(u64::cast_from),
max_in_flight_bytes_cluster_size_fraction: self
.max_inflight_bytes_cluster_size_fraction,
disk_only: self.disk_only,
}
}
fn from_proto(proto: ProtoStorageMaxInflightBytesConfig) -> Result<Self, TryFromProtoError> {
Ok(Self {
max_inflight_bytes_default: proto.max_in_flight_bytes_default.map(usize::cast_from),
max_inflight_bytes_cluster_size_fraction: proto
.max_in_flight_bytes_cluster_size_fraction,
disk_only: proto.disk_only,
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct UpsertAutoSpillConfig {
pub allow_spilling_to_disk: bool,
pub spill_to_disk_threshold_bytes: usize,
}
impl RustType<ProtoUpsertAutoSpillConfig> for UpsertAutoSpillConfig {
fn into_proto(&self) -> ProtoUpsertAutoSpillConfig {
ProtoUpsertAutoSpillConfig {
allow_spilling_to_disk: self.allow_spilling_to_disk,
spill_to_disk_threshold_bytes: u64::cast_from(self.spill_to_disk_threshold_bytes),
}
}
fn from_proto(proto: ProtoUpsertAutoSpillConfig) -> Result<Self, TryFromProtoError> {
Ok(Self {
allow_spilling_to_disk: proto.allow_spilling_to_disk,
spill_to_disk_threshold_bytes: usize::cast_from(proto.spill_to_disk_threshold_bytes),
})
}
}
impl StorageParameters {
pub fn update(
&mut self,
StorageParameters {
persist,
pg_replication_timeouts,
keep_n_source_status_history_entries,
keep_n_sink_status_history_entries,
upsert_rocksdb_tuning_config,
finalize_shards,
tracing,
upsert_auto_spill_config,
storage_dataflow_max_inflight_bytes_config,
grpc_client,
delay_sources_past_rehydration,
shrink_upsert_unused_buffers_by_ratio,
}: StorageParameters,
) {
self.persist.update(persist);
self.pg_replication_timeouts = pg_replication_timeouts;
self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries;
self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
self.finalize_shards = finalize_shards;
self.tracing.update(tracing);
self.finalize_shards = finalize_shards;
self.upsert_auto_spill_config = upsert_auto_spill_config;
self.storage_dataflow_max_inflight_bytes_config =
storage_dataflow_max_inflight_bytes_config;
self.grpc_client.update(grpc_client);
self.delay_sources_past_rehydration = delay_sources_past_rehydration;
self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio;
}
}
impl RustType<ProtoStorageParameters> for StorageParameters {
fn into_proto(&self) -> ProtoStorageParameters {
ProtoStorageParameters {
persist: Some(self.persist.into_proto()),
pg_replication_timeouts: Some(self.pg_replication_timeouts.into_proto()),
keep_n_source_status_history_entries: u64::cast_from(
self.keep_n_source_status_history_entries,
),
keep_n_sink_status_history_entries: u64::cast_from(
self.keep_n_sink_status_history_entries,
),
upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()),
finalize_shards: self.finalize_shards,
tracing: Some(self.tracing.into_proto()),
upsert_auto_spill_config: Some(self.upsert_auto_spill_config.into_proto()),
storage_dataflow_max_inflight_bytes_config: Some(
self.storage_dataflow_max_inflight_bytes_config.into_proto(),
),
grpc_client: Some(self.grpc_client.into_proto()),
storage_dataflow_delay_sources_past_rehydration: self.delay_sources_past_rehydration,
shrink_upsert_unused_buffers_by_ratio: u64::cast_from(
self.shrink_upsert_unused_buffers_by_ratio,
),
}
}
fn from_proto(proto: ProtoStorageParameters) -> Result<Self, TryFromProtoError> {
Ok(Self {
persist: proto
.persist
.into_rust_if_some("ProtoStorageParameters::persist")?,
pg_replication_timeouts: proto
.pg_replication_timeouts
.into_rust_if_some("ProtoStorageParameters::pg_replication_timeouts")?,
keep_n_source_status_history_entries: usize::cast_from(
proto.keep_n_source_status_history_entries,
),
keep_n_sink_status_history_entries: usize::cast_from(
proto.keep_n_sink_status_history_entries,
),
upsert_rocksdb_tuning_config: proto
.upsert_rocksdb_tuning_config
.into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?,
finalize_shards: proto.finalize_shards,
tracing: proto
.tracing
.into_rust_if_some("ProtoStorageParameters::tracing")?,
upsert_auto_spill_config: proto
.upsert_auto_spill_config
.into_rust_if_some("ProtoStorageParameters::upsert_auto_spill_config")?,
storage_dataflow_max_inflight_bytes_config: proto
.storage_dataflow_max_inflight_bytes_config
.into_rust_if_some(
"ProtoStorageParameters::storage_dataflow_max_inflight_bytes_config",
)?,
grpc_client: proto
.grpc_client
.into_rust_if_some("ProtoStorageParameters::grpc_client")?,
delay_sources_past_rehydration: proto.storage_dataflow_delay_sources_past_rehydration,
shrink_upsert_unused_buffers_by_ratio: usize::cast_from(
proto.shrink_upsert_unused_buffers_by_ratio,
),
})
}
}
impl RustType<ProtoPgReplicationTimeouts> for mz_postgres_util::ReplicationTimeouts {
fn into_proto(&self) -> ProtoPgReplicationTimeouts {
ProtoPgReplicationTimeouts {
connect_timeout: self.connect_timeout.into_proto(),
keepalives_retries: self.keepalives_retries,
keepalives_idle: self.keepalives_idle.into_proto(),
keepalives_interval: self.keepalives_interval.into_proto(),
tcp_user_timeout: self.tcp_user_timeout.into_proto(),
}
}
fn from_proto(proto: ProtoPgReplicationTimeouts) -> Result<Self, TryFromProtoError> {
Ok(mz_postgres_util::ReplicationTimeouts {
connect_timeout: proto.connect_timeout.into_rust()?,
keepalives_retries: proto.keepalives_retries,
keepalives_idle: proto.keepalives_idle.into_rust()?,
keepalives_interval: proto.keepalives_interval.into_rust()?,
tcp_user_timeout: proto.tcp_user_timeout.into_rust()?,
})
}
}