mz_storage_types/
parameters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration parameter types.
11
12use std::time::Duration;
13
14use mz_service::params::GrpcClientParameters;
15use mz_ssh_util::tunnel::SshTimeoutConfig;
16use mz_tracing::params::TracingParameters;
17use serde::{Deserialize, Serialize};
18
19// Some of these defaults were recommended by @ph14
20// https://github.com/MaterializeInc/materialize/pull/18644#discussion_r1160071692
21pub const DEFAULT_PG_SOURCE_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL: Duration = Duration::from_secs(10);
23pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE: Duration = Duration::from_secs(10);
24pub const DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES: u32 = 5;
25// This is meant to be DEFAULT_KEEPALIVE_IDLE
26// + DEFAULT_KEEPALIVE_RETRIES * DEFAULT_KEEPALIVE_INTERVAL
27pub const DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT: Duration = Duration::from_secs(40);
28
29/// Whether to apply TCP settings to the server as well as the client.
30///
31/// These option are generally considered something that the upstream DBA should
32/// configure, so we don't override them by default.
33pub const DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER: bool = false;
34
35// The default value for the `wal_sender_timeout` option for PostgreSQL sources.
36//
37// See: <https://www.postgresql.org/docs/current/runtime-config-replication.html>
38//
39// This option is generally considered something that the upstream DBA should
40// configure, so we don't override it by default.
41pub const DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT: Option<Duration> = None;
42
43/// Storage instance configuration parameters.
44///
45/// Parameters can be set (`Some`) or unset (`None`).
46/// Unset parameters should be interpreted to mean "use the previous value".
47#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
48pub struct StorageParameters {
49    pub pg_source_connect_timeout: Option<Duration>,
50    pub pg_source_tcp_keepalives_retries: Option<u32>,
51    pub pg_source_tcp_keepalives_idle: Option<Duration>,
52    pub pg_source_tcp_keepalives_interval: Option<Duration>,
53    pub pg_source_tcp_user_timeout: Option<Duration>,
54    /// Whether to apply the configuration on the server as well.
55    ///
56    /// By default, the timeouts are only applied on the client.
57    pub pg_source_tcp_configure_server: bool,
58    pub pg_source_snapshot_statement_timeout: Duration,
59    pub pg_source_wal_sender_timeout: Option<Duration>,
60    pub mysql_source_timeouts: mz_mysql_util::TimeoutConfig,
61    pub keep_n_source_status_history_entries: usize,
62    pub keep_n_sink_status_history_entries: usize,
63    pub keep_n_privatelink_status_history_entries: usize,
64    pub replica_status_history_retention_window: Duration,
65    /// A set of parameters used to tune RocksDB when used with `UPSERT` sources.
66    pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters,
67    /// Whether or not to allow shard finalization to occur. Note that this will
68    /// only disable the actual finalization of shards, not registering them for
69    /// finalization.
70    pub finalize_shards: bool,
71    pub tracing: TracingParameters,
72    /// A set of parameters used to configure the maximum number of in-flight bytes
73    /// emitted by persist_sources feeding storage dataflows
74    pub storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig,
75    /// gRPC client parameters.
76    pub grpc_client: GrpcClientParameters,
77    /// Configuration ratio to shrink upsert buffers by.
78    pub shrink_upsert_unused_buffers_by_ratio: usize,
79    /// Whether or not to record errors by namespace in the `details`
80    /// column of the status history tables.
81    pub record_namespaced_errors: bool,
82    /// Networking configuration for ssh connections.
83    pub ssh_timeout_config: SshTimeoutConfig,
84    /// Networking configuration for kafka connections.
85    pub kafka_timeout_config: mz_kafka_util::client::TimeoutConfig,
86    /// The interval to emit records to statistics tables
87    pub statistics_interval: Duration,
88    /// The interval to _collect statistics_ within clusterd.
89    // Note: this interval configures the level of granularity we expect statistics
90    // (at least with this implementation) to have. We expect a statistic in the
91    // system tables to be only accurate to within this interval + whatever
92    // skew the `CollectionManager` adds. The stats task in the controller will
93    // be reporting, for each worker, on some interval,
94    // the statistics reported by the most recent collection of statistics as
95    // defined by this interval. This is known to be somewhat inaccurate,
96    // but people mostly care about either rates, or the values to within 1 minute.
97    pub statistics_collection_interval: Duration,
98    pub pg_snapshot_config: PgSourceSnapshotConfig,
99    /// Duration that we wait to batch rows for user owned, storage managed, collections.
100    pub user_storage_managed_collections_batch_duration: Duration,
101
102    /// Updates used to update `StorageConfiguration::config_set`.
103    pub dyncfg_updates: mz_dyncfg::ConfigUpdates,
104}
105
106pub const STATISTICS_INTERVAL_DEFAULT: Duration = Duration::from_secs(60);
107pub const STATISTICS_COLLECTION_INTERVAL_DEFAULT: Duration = Duration::from_secs(10);
108pub const STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT: Duration = Duration::from_secs(1);
109pub const REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT: Duration =
110    Duration::from_secs(30 * 24 * 60 * 60); // 30 days
111
112// Implement `Default` manually, so that the default can match the
113// LD default. This is not strictly necessary, but improves clarity.
114impl Default for StorageParameters {
115    fn default() -> Self {
116        Self {
117            pg_source_connect_timeout: Some(DEFAULT_PG_SOURCE_CONNECT_TIMEOUT),
118            pg_source_tcp_keepalives_retries: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_RETRIES),
119            pg_source_tcp_keepalives_idle: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_IDLE),
120            pg_source_tcp_keepalives_interval: Some(DEFAULT_PG_SOURCE_TCP_KEEPALIVES_INTERVAL),
121            pg_source_tcp_user_timeout: Some(DEFAULT_PG_SOURCE_TCP_USER_TIMEOUT),
122            pg_source_snapshot_statement_timeout:
123                mz_postgres_util::DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT,
124            pg_source_wal_sender_timeout: DEFAULT_PG_SOURCE_WAL_SENDER_TIMEOUT,
125            pg_source_tcp_configure_server: DEFAULT_PG_SOURCE_TCP_CONFIGURE_SERVER,
126            mysql_source_timeouts: Default::default(),
127            keep_n_source_status_history_entries: Default::default(),
128            keep_n_sink_status_history_entries: Default::default(),
129            keep_n_privatelink_status_history_entries: Default::default(),
130            replica_status_history_retention_window:
131                REPLICA_STATUS_HISTORY_RETENTION_WINDOW_DEFAULT,
132            upsert_rocksdb_tuning_config: Default::default(),
133            finalize_shards: Default::default(),
134            tracing: Default::default(),
135            storage_dataflow_max_inflight_bytes_config: Default::default(),
136            grpc_client: Default::default(),
137            shrink_upsert_unused_buffers_by_ratio: Default::default(),
138            record_namespaced_errors: true,
139            ssh_timeout_config: Default::default(),
140            kafka_timeout_config: Default::default(),
141            statistics_interval: STATISTICS_INTERVAL_DEFAULT,
142            statistics_collection_interval: STATISTICS_COLLECTION_INTERVAL_DEFAULT,
143            pg_snapshot_config: Default::default(),
144            user_storage_managed_collections_batch_duration:
145                STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
146            dyncfg_updates: Default::default(),
147        }
148    }
149}
150
151#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
152pub struct StorageMaxInflightBytesConfig {
153    /// The default value for the max in-flight bytes.
154    pub max_inflight_bytes_default: Option<usize>,
155    /// Fraction of the memory limit of the cluster in use to be used as the
156    /// max in-flight bytes for backpressure.
157    pub max_inflight_bytes_cluster_size_fraction: Option<f64>,
158    /// Whether or not the above configs only apply to disk-using dataflows.
159    pub disk_only: bool,
160}
161
162impl Default for StorageMaxInflightBytesConfig {
163    fn default() -> Self {
164        Self {
165            max_inflight_bytes_default: Default::default(),
166            max_inflight_bytes_cluster_size_fraction: Default::default(),
167            disk_only: true,
168        }
169    }
170}
171
172impl StorageParameters {
173    /// Update the parameter values with the set ones from `other`.
174    ///
175    /// NOTE that this does not necessarily update all global state related
176    /// to these parameters, like persist parameters.
177    pub fn update(
178        &mut self,
179        StorageParameters {
180            pg_source_connect_timeout,
181            pg_source_tcp_keepalives_retries,
182            pg_source_tcp_keepalives_idle,
183            pg_source_tcp_keepalives_interval,
184            pg_source_tcp_user_timeout,
185            pg_source_tcp_configure_server,
186            pg_source_snapshot_statement_timeout,
187            pg_source_wal_sender_timeout,
188            mysql_source_timeouts,
189            keep_n_source_status_history_entries,
190            keep_n_sink_status_history_entries,
191            keep_n_privatelink_status_history_entries,
192            replica_status_history_retention_window,
193            upsert_rocksdb_tuning_config,
194            finalize_shards,
195            tracing,
196            storage_dataflow_max_inflight_bytes_config,
197            grpc_client,
198            shrink_upsert_unused_buffers_by_ratio,
199            record_namespaced_errors,
200            ssh_timeout_config,
201            kafka_timeout_config,
202            statistics_interval,
203            statistics_collection_interval,
204            pg_snapshot_config,
205            user_storage_managed_collections_batch_duration,
206            dyncfg_updates,
207        }: StorageParameters,
208    ) {
209        self.pg_source_connect_timeout = pg_source_connect_timeout;
210        self.pg_source_tcp_keepalives_retries = pg_source_tcp_keepalives_retries;
211        self.pg_source_tcp_keepalives_idle = pg_source_tcp_keepalives_idle;
212        self.pg_source_tcp_keepalives_interval = pg_source_tcp_keepalives_interval;
213        self.pg_source_tcp_user_timeout = pg_source_tcp_user_timeout;
214        self.pg_source_tcp_configure_server = pg_source_tcp_configure_server;
215        self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout;
216        self.pg_source_wal_sender_timeout = pg_source_wal_sender_timeout;
217        self.mysql_source_timeouts = mysql_source_timeouts;
218        self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
219        self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries;
220        self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries;
221        self.replica_status_history_retention_window = replica_status_history_retention_window;
222        self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
223        self.finalize_shards = finalize_shards;
224        self.tracing.update(tracing);
225        self.finalize_shards = finalize_shards;
226        self.storage_dataflow_max_inflight_bytes_config =
227            storage_dataflow_max_inflight_bytes_config;
228        self.grpc_client.update(grpc_client);
229        self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio;
230        self.record_namespaced_errors = record_namespaced_errors;
231        self.ssh_timeout_config = ssh_timeout_config;
232        self.kafka_timeout_config = kafka_timeout_config;
233        // We set this in the statistics scraper tasks only once at startup.
234        self.statistics_interval = statistics_interval;
235        self.statistics_collection_interval = statistics_collection_interval;
236        self.pg_snapshot_config = pg_snapshot_config;
237        self.user_storage_managed_collections_batch_duration =
238            user_storage_managed_collections_batch_duration;
239        self.dyncfg_updates.extend(dyncfg_updates);
240    }
241
242    /// Return whether all parameters are unset.
243    pub fn all_unset(&self) -> bool {
244        *self == Self::default()
245    }
246}
247
248/// Configuration for how storage performs Postgres snapshots.
249#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
250pub struct PgSourceSnapshotConfig {
251    /// Whether or not to collect a strict `count(*)` for each table during snapshotting.
252    /// This is more accurate but way more expensive compared to an estimate in `pg_class`.
253    /// For this reason it is only attempted when the estimated count is low enough.
254    pub collect_strict_count: bool,
255}
256
257impl PgSourceSnapshotConfig {
258    pub const fn new() -> Self {
259        PgSourceSnapshotConfig {
260            // We want accurate values, if its not too expensive.
261            collect_strict_count: true,
262        }
263    }
264}
265
266impl Default for PgSourceSnapshotConfig {
267    fn default() -> Self {
268        Self::new()
269    }
270}