mz_storage_types/
parameters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration parameter types.
11
12use std::time::Duration;
13
14use mz_ore::cast::CastFrom;
15use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
16use mz_service::params::GrpcClientParameters;
17use mz_ssh_util::tunnel::SshTimeoutConfig;
18use mz_tracing::params::TracingParameters;
19use serde::{Deserialize, Serialize};
20
21include!(concat!(env!("OUT_DIR"), "/mz_storage_types.parameters.rs"));
22
23// Some of these defaults were recommended by @ph14
24// https://github.com/MaterializeInc/materialize/pull/18644#discussion_r1160071692
25pub const DEFAULT_PG_SOURCE_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL: Duration = Duration::from_secs(10);
27pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE: Duration = Duration::from_secs(10);
28pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES: u32 = 5;
29// This is meant to be DEFAULT_KEEPALIVE_IDLE
30// + DEFAULT_KEEPALIVE_RETRIES * DEFAULT_KEEPALIVE_INTERVAL
31pub const DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT: Duration = Duration::from_secs(40);
32
33/// Whether to apply TCP settings to the server as well as the client.
34///
35/// These option are generally considered something that the upstream DBA should
36/// configure, so we don't override them by default.
37pub const DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER: bool = false;
38
39// The default value for the `wal_sender_timeout` option for PostgreSQL sources.
40//
41// See: <https://www.postgresql.org/docs/current/runtime-config-replication.html>
42//
43// This option is generally considered something that the upstream DBA should
44// configure, so we don't override it by default.
45pub const DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT: Option<Duration> = None;
46
47/// Storage instance configuration parameters.
48///
49/// Parameters can be set (`Some`) or unset (`None`).
50/// Unset parameters should be interpreted to mean "use the previous value".
51#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
52pub struct StorageParameters {
53    pub pg_source_connect_timeout: Option<Duration>,
54    pub pg_source_tcp_keepalives_retries: Option<u32>,
55    pub pg_source_tcp_keepalives_idle: Option<Duration>,
56    pub pg_source_tcp_keepalives_interval: Option<Duration>,
57    pub pg_source_tcp_user_timeout: Option<Duration>,
58    /// Whether to apply the configuration on the server as well.
59    ///
60    /// By default, the timeouts are only applied on the client.
61    pub pg_source_tcp_configure_server: bool,
62    pub pg_source_snapshot_statement_timeout: Duration,
63    pub pg_source_wal_sender_timeout: Option<Duration>,
64    pub mysql_source_timeouts: mz_mysql_util::TimeoutConfig,
65    pub keep_n_source_status_history_entries: usize,
66    pub keep_n_sink_status_history_entries: usize,
67    pub keep_n_privatelink_status_history_entries: usize,
68    pub replica_status_history_retention_window: Duration,
69    /// A set of parameters used to tune RocksDB when used with `UPSERT` sources.
70    pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters,
71    /// Whether or not to allow shard finalization to occur. Note that this will
72    /// only disable the actual finalization of shards, not registering them for
73    /// finalization.
74    pub finalize_shards: bool,
75    pub tracing: TracingParameters,
76    /// A set of parameters used to configure auto spill behaviour if disk is used.
77    pub upsert_auto_spill_config: UpsertAutoSpillConfig,
78    /// A set of parameters used to configure the maximum number of in-flight bytes
79    /// emitted by persist_sources feeding storage dataflows
80    pub storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig,
81    /// gRPC client parameters.
82    pub grpc_client: GrpcClientParameters,
83    /// Configuration ratio to shrink upsert buffers by.
84    pub shrink_upsert_unused_buffers_by_ratio: usize,
85    /// Whether or not to record errors by namespace in the `details`
86    /// column of the status history tables.
87    pub record_namespaced_errors: bool,
88    /// Networking configuration for ssh connections.
89    pub ssh_timeout_config: SshTimeoutConfig,
90    /// Networking configuration for kafka connections.
91    pub kafka_timeout_config: mz_kafka_util::client::TimeoutConfig,
92    /// The interval to emit records to statistics tables
93    pub statistics_interval: Duration,
94    /// The interval to _collect statistics_ within clusterd.
95    // Note: this interval configures the level of granularity we expect statistics
96    // (at least with this implementation) to have. We expect a statistic in the
97    // system tables to be only accurate to within this interval + whatever
98    // skew the `CollectionManager` adds. The stats task in the controller will
99    // be reporting, for each worker, on some interval,
100    // the statistics reported by the most recent collection of statistics as
101    // defined by this interval. This is known to be somewhat inaccurate,
102    // but people mostly care about either rates, or the values to within 1 minute.
103    pub statistics_collection_interval: Duration,
104    pub pg_snapshot_config: PgSourceSnapshotConfig,
105    /// Duration that we wait to batch rows for user owned, storage managed, collections.
106    pub user_storage_managed_collections_batch_duration: Duration,
107
108    /// Updates used to update `StorageConfiguration::config_set`.
109    pub dyncfg_updates: mz_dyncfg::ConfigUpdates,
110}
111
112pub const STATISTICS_INTERVAL_DEFAULT: Duration = Duration::from_secs(60);
113pub const STATISTICS_COLLECTION_INTERVAL_DEFAULT: Duration = Duration::from_secs(10);
114pub const STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT: Duration = Duration::from_secs(1);
115pub const REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT: Duration =
116    Duration::from_secs(30 * 24 * 60 * 60); // 30 days
117
118// Implement `Default` manually, so that the default can match the
119// LD default. This is not strictly necessary, but improves clarity.
120impl Default for StorageParameters {
121    fn default() -> Self {
122        Self {
123            pg_source_connect_timeout: Some(DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
124            pg_source_tcp_keepalives_retries: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
125            pg_source_tcp_keepalives_idle: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
126            pg_source_tcp_keepalives_interval: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
127            pg_source_tcp_user_timeout: Some(DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
128            pg_source_snapshot_statement_timeout:
129                mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT,
130            pg_source_wal_sender_timeout: DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT,
131            pg_source_tcp_configure_server: DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
132            mysql_source_timeouts: Default::default(),
133            keep_n_source_status_history_entries: Default::default(),
134            keep_n_sink_status_history_entries: Default::default(),
135            keep_n_privatelink_status_history_entries: Default::default(),
136            replica_status_history_retention_window:
137                REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT,
138            upsert_rocksdb_tuning_config: Default::default(),
139            finalize_shards: Default::default(),
140            tracing: Default::default(),
141            upsert_auto_spill_config: Default::default(),
142            storage_dataflow_max_inflight_bytes_config: Default::default(),
143            grpc_client: Default::default(),
144            shrink_upsert_unused_buffers_by_ratio: Default::default(),
145            record_namespaced_errors: true,
146            ssh_timeout_config: Default::default(),
147            kafka_timeout_config: Default::default(),
148            statistics_interval: STATISTICS_INTERVAL_DEFAULT,
149            statistics_collection_interval: STATISTICS_COLLECTION_INTERVAL_DEFAULT,
150            pg_snapshot_config: Default::default(),
151            user_storage_managed_collections_batch_duration:
152                STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
153            dyncfg_updates: Default::default(),
154        }
155    }
156}
157
158#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
159pub struct StorageMaxInflightBytesConfig {
160    /// The default value for the max in-flight bytes.
161    pub max_inflight_bytes_default: Option<usize>,
162    /// Fraction of the memory limit of the cluster in use to be used as the
163    /// max in-flight bytes for backpressure.
164    pub max_inflight_bytes_cluster_size_fraction: Option<f64>,
165    /// Whether or not the above configs only apply to disk-using dataflows.
166    pub disk_only: bool,
167}
168
169impl Default for StorageMaxInflightBytesConfig {
170    fn default() -> Self {
171        Self {
172            max_inflight_bytes_default: Default::default(),
173            max_inflight_bytes_cluster_size_fraction: Default::default(),
174            disk_only: true,
175        }
176    }
177}
178
179impl RustType<ProtoStorageMaxInflightBytesConfig> for StorageMaxInflightBytesConfig {
180    fn into_proto(&self) -> ProtoStorageMaxInflightBytesConfig {
181        ProtoStorageMaxInflightBytesConfig {
182            max_in_flight_bytes_default: self.max_inflight_bytes_default.map(u64::cast_from),
183            max_in_flight_bytes_cluster_size_fraction: self
184                .max_inflight_bytes_cluster_size_fraction,
185            disk_only: self.disk_only,
186        }
187    }
188    fn from_proto(proto: ProtoStorageMaxInflightBytesConfig) -> Result<Self, TryFromProtoError> {
189        Ok(Self {
190            max_inflight_bytes_default: proto.max_in_flight_bytes_default.map(usize::cast_from),
191            max_inflight_bytes_cluster_size_fraction: proto
192                .max_in_flight_bytes_cluster_size_fraction,
193            disk_only: proto.disk_only,
194        })
195    }
196}
197
198#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
199pub struct UpsertAutoSpillConfig {
200    /// A flag to whether allow automatically spilling to disk or not
201    pub allow_spilling_to_disk: bool,
202    /// The size in bytes of the upsert state after which rocksdb will be used
203    /// instead of in memory hashmap
204    pub spill_to_disk_threshold_bytes: usize,
205}
206
207impl RustType<ProtoUpsertAutoSpillConfig> for UpsertAutoSpillConfig {
208    fn into_proto(&self) -> ProtoUpsertAutoSpillConfig {
209        ProtoUpsertAutoSpillConfig {
210            allow_spilling_to_disk: self.allow_spilling_to_disk,
211            spill_to_disk_threshold_bytes: u64::cast_from(self.spill_to_disk_threshold_bytes),
212        }
213    }
214    fn from_proto(proto: ProtoUpsertAutoSpillConfig) -> Result<Self, TryFromProtoError> {
215        Ok(Self {
216            allow_spilling_to_disk: proto.allow_spilling_to_disk,
217            spill_to_disk_threshold_bytes: usize::cast_from(proto.spill_to_disk_threshold_bytes),
218        })
219    }
220}
221
222impl StorageParameters {
223    /// Update the parameter values with the set ones from `other`.
224    ///
225    /// NOTE that this does not necessarily update all global state related
226    /// to these parameters, like persist parameters.
227    pub fn update(
228        &mut self,
229        StorageParameters {
230            pg_source_connect_timeout,
231            pg_source_tcp_keepalives_retries,
232            pg_source_tcp_keepalives_idle,
233            pg_source_tcp_keepalives_interval,
234            pg_source_tcp_user_timeout,
235            pg_source_tcp_configure_server,
236            pg_source_snapshot_statement_timeout,
237            pg_source_wal_sender_timeout,
238            mysql_source_timeouts,
239            keep_n_source_status_history_entries,
240            keep_n_sink_status_history_entries,
241            keep_n_privatelink_status_history_entries,
242            replica_status_history_retention_window,
243            upsert_rocksdb_tuning_config,
244            finalize_shards,
245            tracing,
246            upsert_auto_spill_config,
247            storage_dataflow_max_inflight_bytes_config,
248            grpc_client,
249            shrink_upsert_unused_buffers_by_ratio,
250            record_namespaced_errors,
251            ssh_timeout_config,
252            kafka_timeout_config,
253            statistics_interval,
254            statistics_collection_interval,
255            pg_snapshot_config,
256            user_storage_managed_collections_batch_duration,
257            dyncfg_updates,
258        }: StorageParameters,
259    ) {
260        self.pg_source_connect_timeout = pg_source_connect_timeout;
261        self.pg_source_tcp_keepalives_retries = pg_source_tcp_keepalives_retries;
262        self.pg_source_tcp_keepalives_idle = pg_source_tcp_keepalives_idle;
263        self.pg_source_tcp_keepalives_interval = pg_source_tcp_keepalives_interval;
264        self.pg_source_tcp_user_timeout = pg_source_tcp_user_timeout;
265        self.pg_source_tcp_configure_server = pg_source_tcp_configure_server;
266        self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout;
267        self.pg_source_wal_sender_timeout = pg_source_wal_sender_timeout;
268        self.mysql_source_timeouts = mysql_source_timeouts;
269        self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
270        self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries;
271        self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries;
272        self.replica_status_history_retention_window = replica_status_history_retention_window;
273        self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
274        self.finalize_shards = finalize_shards;
275        self.tracing.update(tracing);
276        self.finalize_shards = finalize_shards;
277        self.upsert_auto_spill_config = upsert_auto_spill_config;
278        self.storage_dataflow_max_inflight_bytes_config =
279            storage_dataflow_max_inflight_bytes_config;
280        self.grpc_client.update(grpc_client);
281        self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio;
282        self.record_namespaced_errors = record_namespaced_errors;
283        self.ssh_timeout_config = ssh_timeout_config;
284        self.kafka_timeout_config = kafka_timeout_config;
285        // We set this in the statistics scraper tasks only once at startup.
286        self.statistics_interval = statistics_interval;
287        self.statistics_collection_interval = statistics_collection_interval;
288        self.pg_snapshot_config = pg_snapshot_config;
289        self.user_storage_managed_collections_batch_duration =
290            user_storage_managed_collections_batch_duration;
291        self.dyncfg_updates.extend(dyncfg_updates);
292    }
293
294    /// Return whether all parameters are unset.
295    pub fn all_unset(&self) -> bool {
296        *self == Self::default()
297    }
298}
299
300impl RustType<ProtoStorageParameters> for StorageParameters {
301    fn into_proto(&self) -> ProtoStorageParameters {
302        ProtoStorageParameters {
303            pg_source_connect_timeout: self.pg_source_connect_timeout.into_proto(),
304            pg_source_tcp_keepalives_retries: self.pg_source_tcp_keepalives_retries,
305            pg_source_tcp_keepalives_idle: self.pg_source_tcp_keepalives_idle.into_proto(),
306            pg_source_tcp_keepalives_interval: self.pg_source_tcp_keepalives_interval.into_proto(),
307            pg_source_tcp_user_timeout: self.pg_source_tcp_user_timeout.into_proto(),
308            pg_source_snapshot_statement_timeout: Some(
309                self.pg_source_snapshot_statement_timeout.into_proto(),
310            ),
311            pg_source_tcp_configure_server: self.pg_source_tcp_configure_server,
312            pg_source_wal_sender_timeout: self.pg_source_wal_sender_timeout.into_proto(),
313            mysql_source_timeouts: Some(self.mysql_source_timeouts.into_proto()),
314            keep_n_source_status_history_entries: u64::cast_from(
315                self.keep_n_source_status_history_entries,
316            ),
317            keep_n_sink_status_history_entries: u64::cast_from(
318                self.keep_n_sink_status_history_entries,
319            ),
320            keep_n_privatelink_status_history_entries: u64::cast_from(
321                self.keep_n_privatelink_status_history_entries,
322            ),
323            replica_status_history_retention_window: Some(
324                self.replica_status_history_retention_window.into_proto(),
325            ),
326            upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()),
327            finalize_shards: self.finalize_shards,
328            tracing: Some(self.tracing.into_proto()),
329            upsert_auto_spill_config: Some(self.upsert_auto_spill_config.into_proto()),
330            storage_dataflow_max_inflight_bytes_config: Some(
331                self.storage_dataflow_max_inflight_bytes_config.into_proto(),
332            ),
333            grpc_client: Some(self.grpc_client.into_proto()),
334            shrink_upsert_unused_buffers_by_ratio: u64::cast_from(
335                self.shrink_upsert_unused_buffers_by_ratio,
336            ),
337            record_namespaced_errors: self.record_namespaced_errors,
338            ssh_timeout_config: Some(self.ssh_timeout_config.into_proto()),
339            kafka_timeout_config: Some(self.kafka_timeout_config.into_proto()),
340            statistics_interval: Some(self.statistics_interval.into_proto()),
341            statistics_collection_interval: Some(self.statistics_collection_interval.into_proto()),
342            pg_snapshot_config: Some(self.pg_snapshot_config.into_proto()),
343            user_storage_managed_collections_batch_duration: Some(
344                self.user_storage_managed_collections_batch_duration
345                    .into_proto(),
346            ),
347            dyncfg_updates: Some(self.dyncfg_updates.clone()),
348        }
349    }
350
351    fn from_proto(proto: ProtoStorageParameters) -> Result<Self, TryFromProtoError> {
352        Ok(Self {
353            pg_source_connect_timeout: proto.pg_source_connect_timeout.into_rust()?,
354            pg_source_tcp_keepalives_retries: proto.pg_source_tcp_keepalives_retries,
355            pg_source_tcp_keepalives_idle: proto.pg_source_tcp_keepalives_idle.into_rust()?,
356            pg_source_tcp_keepalives_interval: proto
357                .pg_source_tcp_keepalives_interval
358                .into_rust()?,
359            pg_source_tcp_user_timeout: proto.pg_source_tcp_user_timeout.into_rust()?,
360            pg_source_tcp_configure_server: proto.pg_source_tcp_configure_server,
361            pg_source_snapshot_statement_timeout: proto
362                .pg_source_snapshot_statement_timeout
363                .into_rust_if_some(
364                    "ProtoStorageParameters::pg_source_snapshot_statement_timeout",
365                )?,
366            pg_source_wal_sender_timeout: proto.pg_source_wal_sender_timeout.into_rust()?,
367            mysql_source_timeouts: proto
368                .mysql_source_timeouts
369                .into_rust_if_some("ProtoStorageParameters::mysql_source_timeouts")?,
370            keep_n_source_status_history_entries: usize::cast_from(
371                proto.keep_n_source_status_history_entries,
372            ),
373            keep_n_sink_status_history_entries: usize::cast_from(
374                proto.keep_n_sink_status_history_entries,
375            ),
376            keep_n_privatelink_status_history_entries: usize::cast_from(
377                proto.keep_n_privatelink_status_history_entries,
378            ),
379            replica_status_history_retention_window: proto
380                .replica_status_history_retention_window
381                .into_rust_if_some(
382                    "ProtoStorageParameters::replica_status_history_retention_window",
383                )?,
384            upsert_rocksdb_tuning_config: proto
385                .upsert_rocksdb_tuning_config
386                .into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?,
387            finalize_shards: proto.finalize_shards,
388            tracing: proto
389                .tracing
390                .into_rust_if_some("ProtoStorageParameters::tracing")?,
391            upsert_auto_spill_config: proto
392                .upsert_auto_spill_config
393                .into_rust_if_some("ProtoStorageParameters::upsert_auto_spill_config")?,
394            storage_dataflow_max_inflight_bytes_config: proto
395                .storage_dataflow_max_inflight_bytes_config
396                .into_rust_if_some(
397                    "ProtoStorageParameters::storage_dataflow_max_inflight_bytes_config",
398                )?,
399            grpc_client: proto
400                .grpc_client
401                .into_rust_if_some("ProtoStorageParameters::grpc_client")?,
402            shrink_upsert_unused_buffers_by_ratio: usize::cast_from(
403                proto.shrink_upsert_unused_buffers_by_ratio,
404            ),
405            record_namespaced_errors: proto.record_namespaced_errors,
406            ssh_timeout_config: proto
407                .ssh_timeout_config
408                .into_rust_if_some("ProtoStorageParameters::ssh_timeout_config")?,
409            kafka_timeout_config: proto
410                .kafka_timeout_config
411                .into_rust_if_some("ProtoStorageParameters::kafka_timeout_config")?,
412            statistics_interval: proto
413                .statistics_interval
414                .into_rust_if_some("ProtoStorageParameters::statistics_interval")?,
415            statistics_collection_interval: proto
416                .statistics_collection_interval
417                .into_rust_if_some("ProtoStorageParameters::statistics_collection_interval")?,
418            pg_snapshot_config: proto
419                .pg_snapshot_config
420                .into_rust_if_some("ProtoStorageParameters::pg_snapshot_config")?,
421            user_storage_managed_collections_batch_duration: proto
422                .user_storage_managed_collections_batch_duration
423                .into_rust_if_some(
424                    "ProtoStorageParameters::user_storage_managed_collections_batch_duration",
425                )?,
426            dyncfg_updates: proto.dyncfg_updates.ok_or_else(|| {
427                TryFromProtoError::missing_field("ProtoStorageParameters::dyncfg_updates")
428            })?,
429        })
430    }
431}
432
433/// Configuration for how storage performs Postgres snapshots.
434#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
435pub struct PgSourceSnapshotConfig {
436    /// Whether or not to collect a strict `count(*)` for each table during snapshotting.
437    /// This is more accurate but way more expensive compared to an estimate in `pg_class`.
438    pub collect_strict_count: bool,
439    /// If a table is not vacuumed or analyzed (usually if it has a low number of writes, or low
440    /// number of rows in general) in postgres, the estimate count in `pg_class` is just `-1.
441    /// This config controls whether we should fallback to `count(*)` in that case. It does
442    /// nothing if `collect_strict_count=true`.
443    pub fallback_to_strict_count: bool,
444    /// Whether or not we wait for `count(*)` to finish before beginning to read the snapshot for
445    /// the given table.
446    pub wait_for_count: bool,
447}
448
449impl PgSourceSnapshotConfig {
450    pub const fn new() -> Self {
451        PgSourceSnapshotConfig {
452            // We want accurate values, if its not too expensive.
453            collect_strict_count: true,
454            fallback_to_strict_count: true,
455            // For now, wait to start snapshotting until after we have the count.
456            wait_for_count: true,
457        }
458    }
459}
460
461impl Default for PgSourceSnapshotConfig {
462    fn default() -> Self {
463        Self::new()
464    }
465}
466
467impl RustType<ProtoPgSourceSnapshotConfig> for PgSourceSnapshotConfig {
468    fn into_proto(&self) -> ProtoPgSourceSnapshotConfig {
469        ProtoPgSourceSnapshotConfig {
470            collect_strict_count: self.collect_strict_count,
471            fallback_to_strict_count: self.fallback_to_strict_count,
472            wait_for_count: self.wait_for_count,
473        }
474    }
475
476    fn from_proto(proto: ProtoPgSourceSnapshotConfig) -> Result<Self, TryFromProtoError> {
477        Ok(PgSourceSnapshotConfig {
478            collect_strict_count: proto.collect_strict_count,
479            fallback_to_strict_count: proto.fallback_to_strict_count,
480            wait_for_count: proto.wait_for_count,
481        })
482    }
483}
484
485impl RustType<ProtoKafkaTimeouts> for mz_kafka_util::client::TimeoutConfig {
486    fn into_proto(&self) -> ProtoKafkaTimeouts {
487        ProtoKafkaTimeouts {
488            keepalive: self.keepalive,
489            socket_timeout: Some(self.socket_timeout.into_proto()),
490            transaction_timeout: Some(self.transaction_timeout.into_proto()),
491            socket_connection_setup_timeout: Some(
492                self.socket_connection_setup_timeout.into_proto(),
493            ),
494            fetch_metadata_timeout: Some(self.fetch_metadata_timeout.into_proto()),
495            progress_record_fetch_timeout: Some(self.progress_record_fetch_timeout.into_proto()),
496        }
497    }
498
499    fn from_proto(proto: ProtoKafkaTimeouts) -> Result<Self, TryFromProtoError> {
500        Ok(mz_kafka_util::client::TimeoutConfig {
501            keepalive: proto.keepalive,
502            socket_timeout: proto
503                .socket_timeout
504                .into_rust_if_some("ProtoKafkaSourceTcpTimeouts::socket_timeout")?,
505            transaction_timeout: proto
506                .transaction_timeout
507                .into_rust_if_some("ProtoKafkaSourceTcpTimeouts::transaction_timeout")?,
508            socket_connection_setup_timeout: proto
509                .socket_connection_setup_timeout
510                .into_rust_if_some(
511                    "ProtoKafkaSourceTcpTimeouts::socket_connection_setup_timeout",
512                )?,
513            fetch_metadata_timeout: proto
514                .fetch_metadata_timeout
515                .into_rust_if_some("ProtoKafkaSourceTcpTimeouts::fetch_metadata_timeout")?,
516            progress_record_fetch_timeout: proto
517                .progress_record_fetch_timeout
518                .into_rust_if_some("ProtoKafkaSourceTcpTimeouts::progress_record_fetch_timeout")?,
519        })
520    }
521}
522
523impl RustType<ProtoMySqlSourceTimeouts> for mz_mysql_util::TimeoutConfig {
524    fn into_proto(&self) -> ProtoMySqlSourceTimeouts {
525        ProtoMySqlSourceTimeouts {
526            tcp_keepalive: self.tcp_keepalive.into_proto(),
527            snapshot_max_execution_time: self.snapshot_max_execution_time.into_proto(),
528            snapshot_lock_wait_timeout: self.snapshot_lock_wait_timeout.into_proto(),
529            connect_timeout: self.connect_timeout.into_proto(),
530        }
531    }
532
533    fn from_proto(proto: ProtoMySqlSourceTimeouts) -> Result<Self, TryFromProtoError> {
534        Ok(mz_mysql_util::TimeoutConfig {
535            tcp_keepalive: proto.tcp_keepalive.into_rust()?,
536            snapshot_max_execution_time: proto.snapshot_max_execution_time.into_rust()?,
537            snapshot_lock_wait_timeout: proto.snapshot_lock_wait_timeout.into_rust()?,
538            connect_timeout: proto.connect_timeout.into_rust()?,
539        })
540    }
541}
542
543impl RustType<ProtoSshTimeoutConfig> for SshTimeoutConfig {
544    fn into_proto(&self) -> ProtoSshTimeoutConfig {
545        ProtoSshTimeoutConfig {
546            check_interval: Some(self.check_interval.into_proto()),
547            connect_timeout: Some(self.connect_timeout.into_proto()),
548            keepalives_idle: Some(self.keepalives_idle.into_proto()),
549        }
550    }
551
552    fn from_proto(proto: ProtoSshTimeoutConfig) -> Result<Self, TryFromProtoError> {
553        Ok(SshTimeoutConfig {
554            check_interval: proto
555                .check_interval
556                .into_rust_if_some("ProtoSshTimeoutConfig::check_interval")?,
557            connect_timeout: proto
558                .connect_timeout
559                .into_rust_if_some("ProtoSshTimeoutConfig::connect_timeout")?,
560            keepalives_idle: proto
561                .keepalives_idle
562                .into_rust_if_some("ProtoSshTimeoutConfig::keepalives_idle")?,
563        })
564    }
565}