mz_adapter/
flags.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use mz_compute_client::protocol::command::ComputeParameters;
13use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
14use mz_ore::cast::CastFrom;
15use mz_ore::error::ErrorExt;
16use mz_service::params::GrpcClientParameters;
17use mz_sql::session::vars::SystemVars;
18use mz_storage_types::parameters::{
19    PgSourceSnapshotConfig, StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig,
20};
21use mz_tracing::params::TracingParameters;
22
23use mz_timestamp_oracle::postgres_oracle::PostgresTimestampOracleParameters;
24
25/// Return the current compute configuration, derived from the system configuration.
26pub fn compute_config(config: &SystemVars) -> ComputeParameters {
27    ComputeParameters {
28        workload_class: None,
29        max_result_size: Some(config.max_result_size()),
30        tracing: tracing_config(config),
31        grpc_client: grpc_client_config(config),
32        dyncfg_updates: config.dyncfg_updates(),
33    }
34}
35
36/// Return the current storage configuration, derived from the system configuration.
37pub fn storage_config(config: &SystemVars) -> StorageParameters {
38    StorageParameters {
39        pg_source_connect_timeout: Some(config.pg_source_connect_timeout()),
40        pg_source_tcp_keepalives_retries: Some(config.pg_source_tcp_keepalives_retries()),
41        pg_source_tcp_keepalives_idle: Some(config.pg_source_tcp_keepalives_idle()),
42        pg_source_tcp_keepalives_interval: Some(config.pg_source_tcp_keepalives_interval()),
43        pg_source_tcp_user_timeout: Some(config.pg_source_tcp_user_timeout()),
44        pg_source_tcp_configure_server: config.pg_source_tcp_configure_server(),
45        pg_source_snapshot_statement_timeout: config.pg_source_snapshot_statement_timeout(),
46        pg_source_wal_sender_timeout: config.pg_source_wal_sender_timeout(),
47        mysql_source_timeouts: mz_mysql_util::TimeoutConfig::build(
48            config.mysql_source_snapshot_max_execution_time(),
49            config.mysql_source_snapshot_lock_wait_timeout(),
50            config.mysql_source_tcp_keepalive(),
51            config.mysql_source_connect_timeout(),
52        ),
53        keep_n_source_status_history_entries: config.keep_n_source_status_history_entries(),
54        keep_n_sink_status_history_entries: config.keep_n_sink_status_history_entries(),
55        keep_n_privatelink_status_history_entries: config
56            .keep_n_privatelink_status_history_entries(),
57        replica_status_history_retention_window: config.replica_status_history_retention_window(),
58        upsert_rocksdb_tuning_config: {
59            match mz_rocksdb_types::RocksDBTuningParameters::from_parameters(
60                config.upsert_rocksdb_compaction_style(),
61                config.upsert_rocksdb_optimize_compaction_memtable_budget(),
62                config.upsert_rocksdb_level_compaction_dynamic_level_bytes(),
63                config.upsert_rocksdb_universal_compaction_ratio(),
64                config.upsert_rocksdb_parallelism(),
65                config.upsert_rocksdb_compression_type(),
66                config.upsert_rocksdb_bottommost_compression_type(),
67                config.upsert_rocksdb_batch_size(),
68                config.upsert_rocksdb_retry_duration(),
69                config.upsert_rocksdb_stats_log_interval_seconds(),
70                config.upsert_rocksdb_stats_persist_interval_seconds(),
71                config.upsert_rocksdb_point_lookup_block_cache_size_mb(),
72                config.upsert_rocksdb_shrink_allocated_buffers_by_ratio(),
73                config.upsert_rocksdb_write_buffer_manager_memory_bytes(),
74                config
75                    .upsert_rocksdb_write_buffer_manager_cluster_memory_fraction()
76                    .and_then(|d| match d.try_into() {
77                        Err(e) => {
78                            tracing::error!(
79                                "Couldn't convert upsert_rocksdb_write_buffer_manager_cluster_memory_fraction {:?} to f64, so defaulting to `None`: {e:?}",
80                                config
81                                    .upsert_rocksdb_write_buffer_manager_cluster_memory_fraction()
82                            );
83                            None
84                        }
85                        Ok(o) => Some(o),
86                    }),
87                config.upsert_rocksdb_write_buffer_manager_allow_stall(),
88            ) {
89                Ok(u) => u,
90                Err(e) => {
91                    tracing::warn!(
92                        "Failed to deserialize upsert_rocksdb parameters \
93                            into a `RocksDBTuningParameters`, \
94                            failing back to reasonable defaults: {}",
95                        e.display_with_causes()
96                    );
97                    mz_rocksdb_types::RocksDBTuningParameters::default()
98                }
99            }
100        },
101        finalize_shards: config.enable_storage_shard_finalization(),
102        tracing: tracing_config(config),
103        upsert_auto_spill_config: UpsertAutoSpillConfig {
104            allow_spilling_to_disk: config.upsert_rocksdb_auto_spill_to_disk(),
105            spill_to_disk_threshold_bytes: config.upsert_rocksdb_auto_spill_threshold_bytes(),
106        },
107        storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig {
108            max_inflight_bytes_default: config.storage_dataflow_max_inflight_bytes(),
109            // Interpret the `Numeric` as a float here, we don't need perfect
110            // precision for a percentage. Unfortunately `Decimal` makes us handle errors.
111            max_inflight_bytes_cluster_size_fraction: config
112                .storage_dataflow_max_inflight_bytes_to_cluster_size_fraction()
113                .and_then(|d| match d.try_into() {
114                    Err(e) => {
115                        tracing::error!(
116                            "Couldn't convert {:?} to f64, so defaulting to `None`: {e:?}",
117                            config.storage_dataflow_max_inflight_bytes_to_cluster_size_fraction()
118                        );
119                        None
120                    }
121                    Ok(o) => Some(o),
122                }),
123            disk_only: config.storage_dataflow_max_inflight_bytes_disk_only(),
124        },
125        grpc_client: grpc_client_config(config),
126        shrink_upsert_unused_buffers_by_ratio: config
127            .storage_shrink_upsert_unused_buffers_by_ratio(),
128        record_namespaced_errors: config.storage_record_source_sink_namespaced_errors(),
129        ssh_timeout_config: mz_ssh_util::tunnel::SshTimeoutConfig {
130            check_interval: config.ssh_check_interval(),
131            connect_timeout: config.ssh_connect_timeout(),
132            keepalives_idle: config.ssh_keepalives_idle(),
133        },
134        kafka_timeout_config: mz_kafka_util::client::TimeoutConfig::build(
135            config.kafka_socket_keepalive(),
136            config.kafka_socket_timeout(),
137            config.kafka_transaction_timeout(),
138            config.kafka_socket_connection_setup_timeout(),
139            config.kafka_fetch_metadata_timeout(),
140            config.kafka_progress_record_fetch_timeout(),
141        ),
142        statistics_interval: config.storage_statistics_interval(),
143        statistics_collection_interval: config.storage_statistics_collection_interval(),
144        pg_snapshot_config: PgSourceSnapshotConfig {
145            collect_strict_count: config.pg_source_snapshot_collect_strict_count(),
146            fallback_to_strict_count: config.pg_source_snapshot_fallback_to_strict_count(),
147            wait_for_count: config.pg_source_snapshot_wait_for_count(),
148        },
149        user_storage_managed_collections_batch_duration: config
150            .user_storage_managed_collections_batch_duration(),
151        dyncfg_updates: config.dyncfg_updates(),
152    }
153}
154
155pub fn tracing_config(config: &SystemVars) -> TracingParameters {
156    TracingParameters {
157        log_filter: Some(config.logging_filter()),
158        opentelemetry_filter: Some(config.opentelemetry_filter()),
159        log_filter_defaults: config.logging_filter_defaults(),
160        opentelemetry_filter_defaults: config.opentelemetry_filter_defaults(),
161        sentry_filters: config.sentry_filters(),
162    }
163}
164
165pub fn caching_config(config: &SystemVars) -> mz_secrets::CachingPolicy {
166    let ttl_secs = config.webhooks_secrets_caching_ttl_secs();
167    mz_secrets::CachingPolicy {
168        enabled: ttl_secs > 0,
169        ttl: Duration::from_secs(u64::cast_from(ttl_secs)),
170    }
171}
172
173pub fn pg_timstamp_oracle_config(config: &SystemVars) -> PostgresTimestampOracleParameters {
174    PostgresTimestampOracleParameters {
175        pg_connection_pool_max_size: Some(config.pg_timestamp_oracle_connection_pool_max_size()),
176        pg_connection_pool_max_wait: Some(config.pg_timestamp_oracle_connection_pool_max_wait()),
177        pg_connection_pool_ttl: Some(config.pg_timestamp_oracle_connection_pool_ttl()),
178        pg_connection_pool_ttl_stagger: Some(
179            config.pg_timestamp_oracle_connection_pool_ttl_stagger(),
180        ),
181        // We use a shared set of crdb flags for the basics, but the above flags
182        // for the connection pool are specific to the postgres/crdb timestamp
183        // oracle.
184        pg_connection_pool_connect_timeout: Some(config.crdb_connect_timeout()),
185        pg_connection_pool_tcp_user_timeout: Some(config.crdb_tcp_user_timeout()),
186    }
187}
188
189fn grpc_client_config(config: &SystemVars) -> GrpcClientParameters {
190    GrpcClientParameters {
191        connect_timeout: Some(config.grpc_connect_timeout()),
192        http2_keep_alive_interval: Some(config.grpc_client_http2_keep_alive_interval()),
193        http2_keep_alive_timeout: Some(config.grpc_client_http2_keep_alive_timeout()),
194    }
195}
196
197pub fn orchestrator_scheduling_config(config: &SystemVars) -> ServiceSchedulingConfig {
198    ServiceSchedulingConfig {
199        multi_pod_az_affinity_weight: config.cluster_multi_process_replica_az_affinity_weight(),
200        soften_replication_anti_affinity: config.cluster_soften_replication_anti_affinity(),
201        soften_replication_anti_affinity_weight: config
202            .cluster_soften_replication_anti_affinity_weight(),
203        topology_spread: ServiceTopologySpreadConfig {
204            enabled: config.cluster_enable_topology_spread(),
205            ignore_non_singular_scale: config.cluster_topology_spread_ignore_non_singular_scale(),
206            max_skew: config.cluster_topology_spread_max_skew(),
207            soft: config.cluster_topology_spread_soft(),
208        },
209        soften_az_affinity: config.cluster_soften_az_affinity(),
210        soften_az_affinity_weight: config.cluster_soften_az_affinity_weight(),
211        always_use_disk: config.cluster_always_use_disk(),
212        security_context_enabled: config.cluster_security_context_enabled(),
213    }
214}