1use std::time::Duration;
13
14use mz_service::params::GrpcClientParameters;
15use mz_ssh_util::tunnel::SshTimeoutConfig;
16use mz_tracing::params::TracingParameters;
17use serde::{Deserialize, Serialize};
18
19pub const DEFAULT_PG_SOURCE_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL: Duration = Duration::from_secs(10);
23pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE: Duration = Duration::from_secs(10);
24pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES: u32 = 5;
25pub const DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT: Duration = Duration::from_secs(40);
28
29pub const DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER: bool = false;
34
35pub const DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT: Option<Duration> = None;
42
43#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
48pub struct StorageParameters {
49 pub pg_source_connect_timeout: Option<Duration>,
50 pub pg_source_tcp_keepalives_retries: Option<u32>,
51 pub pg_source_tcp_keepalives_idle: Option<Duration>,
52 pub pg_source_tcp_keepalives_interval: Option<Duration>,
53 pub pg_source_tcp_user_timeout: Option<Duration>,
54 pub pg_source_tcp_configure_server: bool,
58 pub pg_source_snapshot_statement_timeout: Duration,
59 pub pg_source_wal_sender_timeout: Option<Duration>,
60 pub mysql_source_timeouts: mz_mysql_util::TimeoutConfig,
61 pub keep_n_source_status_history_entries: usize,
62 pub keep_n_sink_status_history_entries: usize,
63 pub keep_n_privatelink_status_history_entries: usize,
64 pub replica_status_history_retention_window: Duration,
65 pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters,
67 pub finalize_shards: bool,
71 pub tracing: TracingParameters,
72 pub storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig,
75 pub grpc_client: GrpcClientParameters,
77 pub shrink_upsert_unused_buffers_by_ratio: usize,
79 pub record_namespaced_errors: bool,
82 pub ssh_timeout_config: SshTimeoutConfig,
84 pub kafka_timeout_config: mz_kafka_util::client::TimeoutConfig,
86 pub statistics_interval: Duration,
88 pub statistics_collection_interval: Duration,
98 pub pg_snapshot_config: PgSourceSnapshotConfig,
99 pub user_storage_managed_collections_batch_duration: Duration,
101
102 pub dyncfg_updates: mz_dyncfg::ConfigUpdates,
104}
105
106pub const STATISTICS_INTERVAL_DEFAULT: Duration = Duration::from_secs(60);
107pub const STATISTICS_COLLECTION_INTERVAL_DEFAULT: Duration = Duration::from_secs(10);
108pub const STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT: Duration = Duration::from_secs(1);
109pub const REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT: Duration =
110 Duration::from_secs(30 * 24 * 60 * 60); impl Default for StorageParameters {
115 fn default() -> Self {
116 Self {
117 pg_source_connect_timeout: Some(DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
118 pg_source_tcp_keepalives_retries: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
119 pg_source_tcp_keepalives_idle: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
120 pg_source_tcp_keepalives_interval: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
121 pg_source_tcp_user_timeout: Some(DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
122 pg_source_snapshot_statement_timeout:
123 mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT,
124 pg_source_wal_sender_timeout: DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT,
125 pg_source_tcp_configure_server: DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
126 mysql_source_timeouts: Default::default(),
127 keep_n_source_status_history_entries: Default::default(),
128 keep_n_sink_status_history_entries: Default::default(),
129 keep_n_privatelink_status_history_entries: Default::default(),
130 replica_status_history_retention_window:
131 REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT,
132 upsert_rocksdb_tuning_config: Default::default(),
133 finalize_shards: Default::default(),
134 tracing: Default::default(),
135 storage_dataflow_max_inflight_bytes_config: Default::default(),
136 grpc_client: Default::default(),
137 shrink_upsert_unused_buffers_by_ratio: Default::default(),
138 record_namespaced_errors: true,
139 ssh_timeout_config: Default::default(),
140 kafka_timeout_config: Default::default(),
141 statistics_interval: STATISTICS_INTERVAL_DEFAULT,
142 statistics_collection_interval: STATISTICS_COLLECTION_INTERVAL_DEFAULT,
143 pg_snapshot_config: Default::default(),
144 user_storage_managed_collections_batch_duration:
145 STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
146 dyncfg_updates: Default::default(),
147 }
148 }
149}
150
151#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
152pub struct StorageMaxInflightBytesConfig {
153 pub max_inflight_bytes_default: Option<usize>,
155 pub max_inflight_bytes_cluster_size_fraction: Option<f64>,
158 pub disk_only: bool,
160}
161
162impl Default for StorageMaxInflightBytesConfig {
163 fn default() -> Self {
164 Self {
165 max_inflight_bytes_default: Default::default(),
166 max_inflight_bytes_cluster_size_fraction: Default::default(),
167 disk_only: true,
168 }
169 }
170}
171
172impl StorageParameters {
173 pub fn update(
178 &mut self,
179 StorageParameters {
180 pg_source_connect_timeout,
181 pg_source_tcp_keepalives_retries,
182 pg_source_tcp_keepalives_idle,
183 pg_source_tcp_keepalives_interval,
184 pg_source_tcp_user_timeout,
185 pg_source_tcp_configure_server,
186 pg_source_snapshot_statement_timeout,
187 pg_source_wal_sender_timeout,
188 mysql_source_timeouts,
189 keep_n_source_status_history_entries,
190 keep_n_sink_status_history_entries,
191 keep_n_privatelink_status_history_entries,
192 replica_status_history_retention_window,
193 upsert_rocksdb_tuning_config,
194 finalize_shards,
195 tracing,
196 storage_dataflow_max_inflight_bytes_config,
197 grpc_client,
198 shrink_upsert_unused_buffers_by_ratio,
199 record_namespaced_errors,
200 ssh_timeout_config,
201 kafka_timeout_config,
202 statistics_interval,
203 statistics_collection_interval,
204 pg_snapshot_config,
205 user_storage_managed_collections_batch_duration,
206 dyncfg_updates,
207 }: StorageParameters,
208 ) {
209 self.pg_source_connect_timeout = pg_source_connect_timeout;
210 self.pg_source_tcp_keepalives_retries = pg_source_tcp_keepalives_retries;
211 self.pg_source_tcp_keepalives_idle = pg_source_tcp_keepalives_idle;
212 self.pg_source_tcp_keepalives_interval = pg_source_tcp_keepalives_interval;
213 self.pg_source_tcp_user_timeout = pg_source_tcp_user_timeout;
214 self.pg_source_tcp_configure_server = pg_source_tcp_configure_server;
215 self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout;
216 self.pg_source_wal_sender_timeout = pg_source_wal_sender_timeout;
217 self.mysql_source_timeouts = mysql_source_timeouts;
218 self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
219 self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries;
220 self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries;
221 self.replica_status_history_retention_window = replica_status_history_retention_window;
222 self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
223 self.finalize_shards = finalize_shards;
224 self.tracing.update(tracing);
225 self.finalize_shards = finalize_shards;
226 self.storage_dataflow_max_inflight_bytes_config =
227 storage_dataflow_max_inflight_bytes_config;
228 self.grpc_client.update(grpc_client);
229 self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio;
230 self.record_namespaced_errors = record_namespaced_errors;
231 self.ssh_timeout_config = ssh_timeout_config;
232 self.kafka_timeout_config = kafka_timeout_config;
233 self.statistics_interval = statistics_interval;
235 self.statistics_collection_interval = statistics_collection_interval;
236 self.pg_snapshot_config = pg_snapshot_config;
237 self.user_storage_managed_collections_batch_duration =
238 user_storage_managed_collections_batch_duration;
239 self.dyncfg_updates.extend(dyncfg_updates);
240 }
241
242 pub fn all_unset(&self) -> bool {
244 *self == Self::default()
245 }
246}
247
248#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
250pub struct PgSourceSnapshotConfig {
251 pub collect_strict_count: bool,
255}
256
257impl PgSourceSnapshotConfig {
258 pub const fn new() -> Self {
259 PgSourceSnapshotConfig {
260 collect_strict_count: true,
262 }
263 }
264}
265
266impl Default for PgSourceSnapshotConfig {
267 fn default() -> Self {
268 Self::new()
269 }
270}