1use std::collections::{BTreeMap, BTreeSet};
11
12use anyhow::bail;
13use bytesize::ByteSize;
14use ipnet::IpNet;
15use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
16use mz_auth::password::Password;
17use mz_build_info::BuildInfo;
18use mz_cloud_resources::AwsExternalIdPrefix;
19use mz_controller::clusters::ReplicaAllocation;
20use mz_license_keys::ValidatedLicenseKey;
21use mz_orchestrator::MemoryLimit;
22use mz_ore::cast::CastFrom;
23use mz_ore::metrics::MetricsRegistry;
24use mz_persist_client::PersistClient;
25use mz_repr::CatalogItemId;
26use mz_repr::adt::numeric::Numeric;
27use mz_sql::catalog::CatalogError as SqlCatalogError;
28use mz_sql::catalog::EnvironmentId;
29use serde::Serialize;
30
31use crate::durable::{CatalogError, DurableCatalogState};
32
33const GIB: u64 = 1024 * 1024 * 1024;
34
35#[derive(Debug)]
37pub struct Config<'a> {
38 pub storage: Box<dyn DurableCatalogState>,
40 pub metrics_registry: &'a MetricsRegistry,
42 pub state: StateConfig,
43}
44
45#[derive(Debug)]
46pub struct StateConfig {
47 pub unsafe_mode: bool,
49 pub all_features: bool,
51 pub build_info: &'static BuildInfo,
53 pub environment_id: EnvironmentId,
55 pub read_only: bool,
57 pub now: mz_ore::now::NowFn,
59 pub boot_ts: mz_repr::Timestamp,
61 pub skip_migrations: bool,
63 pub cluster_replica_sizes: ClusterReplicaSizeMap,
65 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
67 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
69 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
71 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
73 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
75 pub system_parameter_defaults: BTreeMap<String, String>,
77 pub remote_system_parameters: Option<BTreeMap<String, String>>,
80 pub availability_zones: Vec<String>,
82 pub egress_addresses: Vec<IpNet>,
84 pub aws_principal_context: Option<AwsPrincipalContext>,
86 pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
88 pub http_host_name: Option<String>,
90 pub connection_context: mz_storage_types::connections::ConnectionContext,
92 pub builtin_item_migration_config: BuiltinItemMigrationConfig,
93 pub persist_client: PersistClient,
94 pub enable_expression_cache_override: Option<bool>,
97 pub enable_0dt_deployment: bool,
99 pub helm_chart_version: Option<String>,
101 pub external_login_password_mz_system: Option<Password>,
102 pub license_key: ValidatedLicenseKey,
103}
104
105#[derive(Debug)]
106pub struct BuiltinItemMigrationConfig {
107 pub persist_client: PersistClient,
108 pub read_only: bool,
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
113
114impl ClusterReplicaSizeMap {
115 pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
116 let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
117 serde_json::from_str(s)?;
118 if credit_consumption_from_memory {
119 for (name, replica) in cluster_replica_sizes.iter_mut() {
120 let Some(memory_limit) = replica.memory_limit else {
121 bail!("No memory limit found in cluster definition for {name}");
122 };
123 replica.credits_per_hour = Numeric::from(
124 (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
125 ) / Numeric::from(1 * GIB);
126 }
127 }
128 Ok(Self(cluster_replica_sizes))
129 }
130
131 pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
133 self.0.iter().filter(|(_, a)| !a.disabled)
134 }
135
136 pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
139 self.0.get(name).ok_or_else(|| {
140 CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
141 })
142 }
143
144 pub fn for_tests() -> Self {
149 let mut inner = (0..=5)
171 .flat_map(|i| {
172 let workers: u8 = 1 << i;
173 [
174 (format!("scale=1,workers={workers}"), None),
175 (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
176 (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
177 (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
178 (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
179 ]
180 .map(|(name, memory_limit)| {
181 (
182 name,
183 ReplicaAllocation {
184 memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
185 cpu_limit: None,
186 disk_limit: None,
187 scale: 1,
188 workers: workers.into(),
189 credits_per_hour: 1.into(),
190 cpu_exclusive: false,
191 is_cc: false,
192 swap_enabled: false,
193 disabled: false,
194 selectors: BTreeMap::default(),
195 },
196 )
197 })
198 })
199 .collect::<BTreeMap<_, _>>();
200
201 for i in 1..=5 {
202 let scale = 1 << i;
203 inner.insert(
204 format!("scale={scale},workers=1"),
205 ReplicaAllocation {
206 memory_limit: None,
207 cpu_limit: None,
208 disk_limit: None,
209 scale,
210 workers: 1,
211 credits_per_hour: scale.into(),
212 cpu_exclusive: false,
213 is_cc: false,
214 swap_enabled: false,
215 disabled: false,
216 selectors: BTreeMap::default(),
217 },
218 );
219
220 inner.insert(
221 format!("scale={scale},workers={scale}"),
222 ReplicaAllocation {
223 memory_limit: None,
224 cpu_limit: None,
225 disk_limit: None,
226 scale,
227 workers: scale.into(),
228 credits_per_hour: scale.into(),
229 cpu_exclusive: false,
230 is_cc: false,
231 swap_enabled: false,
232 disabled: false,
233 selectors: BTreeMap::default(),
234 },
235 );
236
237 inner.insert(
238 format!("scale=1,workers=8,mem={scale}GiB"),
239 ReplicaAllocation {
240 memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
241 cpu_limit: None,
242 disk_limit: None,
243 scale: 1,
244 workers: 8,
245 credits_per_hour: 1.into(),
246 cpu_exclusive: false,
247 is_cc: false,
248 swap_enabled: false,
249 disabled: false,
250 selectors: BTreeMap::default(),
251 },
252 );
253 }
254
255 inner.insert(
256 "scale=2,workers=4".to_string(),
257 ReplicaAllocation {
258 memory_limit: None,
259 cpu_limit: None,
260 disk_limit: None,
261 scale: 2,
262 workers: 4,
263 credits_per_hour: 2.into(),
264 cpu_exclusive: false,
265 is_cc: false,
266 swap_enabled: false,
267 disabled: false,
268 selectors: BTreeMap::default(),
269 },
270 );
271
272 inner.insert(
273 "free".to_string(),
274 ReplicaAllocation {
275 memory_limit: None,
276 cpu_limit: None,
277 disk_limit: None,
278 scale: 0,
279 workers: 0,
280 credits_per_hour: 0.into(),
281 cpu_exclusive: false,
282 is_cc: true,
283 swap_enabled: false,
284 disabled: true,
285 selectors: BTreeMap::default(),
286 },
287 );
288
289 Self(inner)
290 }
291}
292
293#[derive(Debug, Clone, Serialize)]
298pub struct AwsPrincipalContext {
299 pub aws_account_id: String,
300 pub aws_external_id_prefix: AwsExternalIdPrefix,
301}
302
303impl AwsPrincipalContext {
304 pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
305 format!(
306 "arn:aws:iam::{}:role/mz_{}_{}",
307 self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
308 )
309 }
310}