1use std::time::Duration;
11
12use mz_compute_client::protocol::command::ComputeParameters;
13use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
14use mz_ore::cast::CastFrom;
15use mz_ore::error::ErrorExt;
16use mz_service::params::GrpcClientParameters;
17use mz_sql::session::vars::SystemVars;
18use mz_storage_types::parameters::{
19 PgSourceSnapshotConfig, StorageMaxInflightBytesConfig, StorageParameters,
20};
21use mz_tracing::params::TracingParameters;
22
23use mz_timestamp_oracle::postgres_oracle::PostgresTimestampOracleParameters;
24
25pub fn compute_config(config: &SystemVars) -> ComputeParameters {
27 ComputeParameters {
28 workload_class: None,
29 max_result_size: Some(config.max_result_size()),
30 tracing: tracing_config(config),
31 grpc_client: grpc_client_config(config),
32 dyncfg_updates: config.dyncfg_updates(),
33 }
34}
35
36pub fn storage_config(config: &SystemVars) -> StorageParameters {
38 StorageParameters {
39 pg_source_connect_timeout: Some(config.pg_source_connect_timeout()),
40 pg_source_tcp_keepalives_retries: Some(config.pg_source_tcp_keepalives_retries()),
41 pg_source_tcp_keepalives_idle: Some(config.pg_source_tcp_keepalives_idle()),
42 pg_source_tcp_keepalives_interval: Some(config.pg_source_tcp_keepalives_interval()),
43 pg_source_tcp_user_timeout: Some(config.pg_source_tcp_user_timeout()),
44 pg_source_tcp_configure_server: config.pg_source_tcp_configure_server(),
45 pg_source_snapshot_statement_timeout: config.pg_source_snapshot_statement_timeout(),
46 pg_source_wal_sender_timeout: config.pg_source_wal_sender_timeout(),
47 mysql_source_timeouts: mz_mysql_util::TimeoutConfig::build(
48 config.mysql_source_snapshot_max_execution_time(),
49 config.mysql_source_snapshot_lock_wait_timeout(),
50 config.mysql_source_tcp_keepalive(),
51 config.mysql_source_connect_timeout(),
52 ),
53 keep_n_source_status_history_entries: config.keep_n_source_status_history_entries(),
54 keep_n_sink_status_history_entries: config.keep_n_sink_status_history_entries(),
55 keep_n_privatelink_status_history_entries: config
56 .keep_n_privatelink_status_history_entries(),
57 replica_status_history_retention_window: config.replica_status_history_retention_window(),
58 upsert_rocksdb_tuning_config: {
59 match mz_rocksdb_types::RocksDBTuningParameters::from_parameters(
60 config.upsert_rocksdb_compaction_style(),
61 config.upsert_rocksdb_optimize_compaction_memtable_budget(),
62 config.upsert_rocksdb_level_compaction_dynamic_level_bytes(),
63 config.upsert_rocksdb_universal_compaction_ratio(),
64 config.upsert_rocksdb_parallelism(),
65 config.upsert_rocksdb_compression_type(),
66 config.upsert_rocksdb_bottommost_compression_type(),
67 config.upsert_rocksdb_batch_size(),
68 config.upsert_rocksdb_retry_duration(),
69 config.upsert_rocksdb_stats_log_interval_seconds(),
70 config.upsert_rocksdb_stats_persist_interval_seconds(),
71 config.upsert_rocksdb_point_lookup_block_cache_size_mb(),
72 config.upsert_rocksdb_shrink_allocated_buffers_by_ratio(),
73 config.upsert_rocksdb_write_buffer_manager_memory_bytes(),
74 config
75 .upsert_rocksdb_write_buffer_manager_cluster_memory_fraction()
76 .and_then(|d| match d.try_into() {
77 Err(e) => {
78 tracing::error!(
79 "Couldn't convert upsert_rocksdb_write_buffer_manager_cluster_memory_fraction {:?} to f64, so defaulting to `None`: {e:?}",
80 config
81 .upsert_rocksdb_write_buffer_manager_cluster_memory_fraction()
82 );
83 None
84 }
85 Ok(o) => Some(o),
86 }),
87 config.upsert_rocksdb_write_buffer_manager_allow_stall(),
88 ) {
89 Ok(u) => u,
90 Err(e) => {
91 tracing::warn!(
92 "Failed to deserialize upsert_rocksdb parameters \
93 into a `RocksDBTuningParameters`, \
94 failing back to reasonable defaults: {}",
95 e.display_with_causes()
96 );
97 mz_rocksdb_types::RocksDBTuningParameters::default()
98 }
99 }
100 },
101 finalize_shards: config.enable_storage_shard_finalization(),
102 tracing: tracing_config(config),
103 storage_dataflow_max_inflight_bytes_config: StorageMaxInflightBytesConfig {
104 max_inflight_bytes_default: config.storage_dataflow_max_inflight_bytes(),
105 max_inflight_bytes_cluster_size_fraction: config
108 .storage_dataflow_max_inflight_bytes_to_cluster_size_fraction()
109 .and_then(|d| match d.try_into() {
110 Err(e) => {
111 tracing::error!(
112 "Couldn't convert {:?} to f64, so defaulting to `None`: {e:?}",
113 config.storage_dataflow_max_inflight_bytes_to_cluster_size_fraction()
114 );
115 None
116 }
117 Ok(o) => Some(o),
118 }),
119 disk_only: config.storage_dataflow_max_inflight_bytes_disk_only(),
120 },
121 grpc_client: grpc_client_config(config),
122 shrink_upsert_unused_buffers_by_ratio: config
123 .storage_shrink_upsert_unused_buffers_by_ratio(),
124 record_namespaced_errors: config.storage_record_source_sink_namespaced_errors(),
125 ssh_timeout_config: mz_ssh_util::tunnel::SshTimeoutConfig {
126 check_interval: config.ssh_check_interval(),
127 connect_timeout: config.ssh_connect_timeout(),
128 keepalives_idle: config.ssh_keepalives_idle(),
129 },
130 kafka_timeout_config: mz_kafka_util::client::TimeoutConfig::build(
131 config.kafka_socket_keepalive(),
132 config.kafka_socket_timeout(),
133 config.kafka_transaction_timeout(),
134 config.kafka_socket_connection_setup_timeout(),
135 config.kafka_fetch_metadata_timeout(),
136 config.kafka_progress_record_fetch_timeout(),
137 ),
138 statistics_interval: config.storage_statistics_interval(),
139 statistics_collection_interval: config.storage_statistics_collection_interval(),
140 pg_snapshot_config: PgSourceSnapshotConfig {
141 collect_strict_count: config.pg_source_snapshot_collect_strict_count(),
142 },
143 user_storage_managed_collections_batch_duration: config
144 .user_storage_managed_collections_batch_duration(),
145 dyncfg_updates: config.dyncfg_updates(),
146 }
147}
148
149pub fn tracing_config(config: &SystemVars) -> TracingParameters {
150 TracingParameters {
151 log_filter: Some(config.logging_filter()),
152 opentelemetry_filter: Some(config.opentelemetry_filter()),
153 log_filter_defaults: config.logging_filter_defaults(),
154 opentelemetry_filter_defaults: config.opentelemetry_filter_defaults(),
155 sentry_filters: config.sentry_filters(),
156 }
157}
158
159pub fn caching_config(config: &SystemVars) -> mz_secrets::CachingPolicy {
160 let ttl_secs = config.webhooks_secrets_caching_ttl_secs();
161 mz_secrets::CachingPolicy {
162 enabled: ttl_secs > 0,
163 ttl: Duration::from_secs(u64::cast_from(ttl_secs)),
164 }
165}
166
167pub fn pg_timstamp_oracle_config(config: &SystemVars) -> PostgresTimestampOracleParameters {
168 PostgresTimestampOracleParameters {
169 pg_connection_pool_max_size: Some(config.pg_timestamp_oracle_connection_pool_max_size()),
170 pg_connection_pool_max_wait: Some(config.pg_timestamp_oracle_connection_pool_max_wait()),
171 pg_connection_pool_ttl: Some(config.pg_timestamp_oracle_connection_pool_ttl()),
172 pg_connection_pool_ttl_stagger: Some(
173 config.pg_timestamp_oracle_connection_pool_ttl_stagger(),
174 ),
175 pg_connection_pool_connect_timeout: Some(config.crdb_connect_timeout()),
179 pg_connection_pool_tcp_user_timeout: Some(config.crdb_tcp_user_timeout()),
180 }
181}
182
183fn grpc_client_config(config: &SystemVars) -> GrpcClientParameters {
184 GrpcClientParameters {
185 connect_timeout: Some(config.grpc_connect_timeout()),
186 http2_keep_alive_interval: Some(config.grpc_client_http2_keep_alive_interval()),
187 http2_keep_alive_timeout: Some(config.grpc_client_http2_keep_alive_timeout()),
188 }
189}
190
191pub fn orchestrator_scheduling_config(config: &SystemVars) -> ServiceSchedulingConfig {
192 ServiceSchedulingConfig {
193 multi_pod_az_affinity_weight: config.cluster_multi_process_replica_az_affinity_weight(),
194 soften_replication_anti_affinity: config.cluster_soften_replication_anti_affinity(),
195 soften_replication_anti_affinity_weight: config
196 .cluster_soften_replication_anti_affinity_weight(),
197 topology_spread: ServiceTopologySpreadConfig {
198 enabled: config.cluster_enable_topology_spread(),
199 ignore_non_singular_scale: config.cluster_topology_spread_ignore_non_singular_scale(),
200 max_skew: config.cluster_topology_spread_max_skew(),
201 min_domains: config.cluster_topology_spread_set_min_domains(),
202 soft: config.cluster_topology_spread_soft(),
203 },
204 soften_az_affinity: config.cluster_soften_az_affinity(),
205 soften_az_affinity_weight: config.cluster_soften_az_affinity_weight(),
206 security_context_enabled: config.cluster_security_context_enabled(),
207 }
208}