1use std::collections::{BTreeMap, BTreeSet};
11use std::num::NonZero;
12
13use anyhow::bail;
14use bytesize::ByteSize;
15use ipnet::IpNet;
16use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
17use mz_auth::password::Password;
18use mz_build_info::BuildInfo;
19use mz_cloud_resources::AwsExternalIdPrefix;
20use mz_controller::clusters::ReplicaAllocation;
21use mz_license_keys::ValidatedLicenseKey;
22use mz_orchestrator::MemoryLimit;
23use mz_ore::cast::CastFrom;
24use mz_ore::metrics::MetricsRegistry;
25use mz_persist_client::PersistClient;
26use mz_repr::CatalogItemId;
27use mz_repr::adt::numeric::Numeric;
28use mz_sql::catalog::CatalogError as SqlCatalogError;
29use mz_sql::catalog::EnvironmentId;
30use serde::Serialize;
31
32use crate::durable::{CatalogError, DurableCatalogState};
33
34const GIB: u64 = 1024 * 1024 * 1024;
35
36#[derive(Debug)]
38pub struct Config<'a> {
39 pub storage: Box<dyn DurableCatalogState>,
41 pub metrics_registry: &'a MetricsRegistry,
43 pub state: StateConfig,
44}
45
46#[derive(Debug)]
47pub struct StateConfig {
48 pub unsafe_mode: bool,
50 pub all_features: bool,
52 pub build_info: &'static BuildInfo,
54 pub deploy_generation: u64,
56 pub environment_id: EnvironmentId,
58 pub read_only: bool,
60 pub now: mz_ore::now::NowFn,
62 pub boot_ts: mz_repr::Timestamp,
64 pub skip_migrations: bool,
66 pub cluster_replica_sizes: ClusterReplicaSizeMap,
68 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
70 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
72 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
74 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
76 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
78 pub system_parameter_defaults: BTreeMap<String, String>,
80 pub remote_system_parameters: Option<BTreeMap<String, String>>,
83 pub availability_zones: Vec<String>,
85 pub egress_addresses: Vec<IpNet>,
87 pub aws_principal_context: Option<AwsPrincipalContext>,
89 pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
91 pub http_host_name: Option<String>,
93 pub connection_context: mz_storage_types::connections::ConnectionContext,
95 pub builtin_item_migration_config: BuiltinItemMigrationConfig,
96 pub persist_client: PersistClient,
97 pub enable_expression_cache_override: Option<bool>,
100 pub helm_chart_version: Option<String>,
102 pub external_login_password_mz_system: Option<Password>,
103 pub license_key: ValidatedLicenseKey,
104}
105
106#[derive(Debug)]
107pub struct BuiltinItemMigrationConfig {
108 pub persist_client: PersistClient,
109 pub read_only: bool,
110 pub force_migration: Option<String>,
111}
112
113#[derive(Debug, Clone, Serialize)]
114pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
115
116impl ClusterReplicaSizeMap {
117 pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
118 let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
119 serde_json::from_str(s)?;
120 if credit_consumption_from_memory {
121 for (name, replica) in cluster_replica_sizes.iter_mut() {
122 let Some(memory_limit) = replica.memory_limit else {
123 bail!("No memory limit found in cluster definition for {name}");
124 };
125 let total_memory = memory_limit.0 * replica.scale.get();
126 replica.credits_per_hour = Numeric::from(total_memory.0) / Numeric::from(GIB);
127 }
128 }
129 Ok(Self(cluster_replica_sizes))
130 }
131
132 pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
134 self.0.iter().filter(|(_, a)| !a.disabled)
135 }
136
137 pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
140 self.0.get(name).ok_or_else(|| {
141 CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
142 })
143 }
144
145 pub fn for_tests() -> Self {
150 let mut inner = (0..=5)
172 .flat_map(|i| {
173 let workers = 1 << i;
174 [
175 (format!("scale=1,workers={workers}"), None),
176 (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
177 (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
178 (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
179 (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
180 ]
181 .map(|(name, memory_limit)| {
182 (
183 name,
184 ReplicaAllocation {
185 memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
186 memory_request: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
187 cpu_limit: None,
188 cpu_request: None,
189 disk_limit: None,
190 scale: NonZero::new(1).expect("not zero"),
191 workers: NonZero::new(workers).expect("not zero"),
192 credits_per_hour: 1.into(),
193 cpu_exclusive: false,
194 is_cc: false,
195 swap_enabled: false,
196 disabled: false,
197 selectors: BTreeMap::default(),
198 },
199 )
200 })
201 })
202 .collect::<BTreeMap<_, _>>();
203
204 for i in 1..=5 {
205 let scale = 1 << i;
206 inner.insert(
207 format!("scale={scale},workers=1"),
208 ReplicaAllocation {
209 memory_limit: None,
210 memory_request: None,
211 cpu_limit: None,
212 cpu_request: None,
213 disk_limit: None,
214 scale: NonZero::new(scale).expect("not zero"),
215 workers: NonZero::new(1).expect("not zero"),
216 credits_per_hour: scale.into(),
217 cpu_exclusive: false,
218 is_cc: false,
219 swap_enabled: false,
220 disabled: false,
221 selectors: BTreeMap::default(),
222 },
223 );
224
225 inner.insert(
226 format!("scale={scale},workers={scale}"),
227 ReplicaAllocation {
228 memory_limit: None,
229 memory_request: None,
230 cpu_limit: None,
231 cpu_request: None,
232 disk_limit: None,
233 scale: NonZero::new(scale).expect("not zero"),
234 workers: NonZero::new(scale.into()).expect("not zero"),
235 credits_per_hour: scale.into(),
236 cpu_exclusive: false,
237 is_cc: false,
238 swap_enabled: false,
239 disabled: false,
240 selectors: BTreeMap::default(),
241 },
242 );
243
244 inner.insert(
245 format!("scale=1,workers=8,mem={scale}GiB"),
246 ReplicaAllocation {
247 memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
248 memory_request: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
249 cpu_limit: None,
250 cpu_request: None,
251 disk_limit: None,
252 scale: NonZero::new(1).expect("not zero"),
253 workers: NonZero::new(8).expect("not zero"),
254 credits_per_hour: 1.into(),
255 cpu_exclusive: false,
256 is_cc: false,
257 swap_enabled: false,
258 disabled: false,
259 selectors: BTreeMap::default(),
260 },
261 );
262 }
263
264 inner.insert(
265 "scale=2,workers=4".to_string(),
266 ReplicaAllocation {
267 memory_limit: None,
268 memory_request: None,
269 cpu_limit: None,
270 cpu_request: None,
271 disk_limit: None,
272 scale: NonZero::new(2).expect("not zero"),
273 workers: NonZero::new(4).expect("not zero"),
274 credits_per_hour: 2.into(),
275 cpu_exclusive: false,
276 is_cc: false,
277 swap_enabled: false,
278 disabled: false,
279 selectors: BTreeMap::default(),
280 },
281 );
282
283 inner.insert(
284 "free".to_string(),
285 ReplicaAllocation {
286 memory_limit: None,
287 memory_request: None,
288 cpu_limit: None,
289 cpu_request: None,
290 disk_limit: None,
291 scale: NonZero::new(1).expect("not zero"),
292 workers: NonZero::new(1).expect("not zero"),
293 credits_per_hour: 0.into(),
294 cpu_exclusive: false,
295 is_cc: true,
296 swap_enabled: false,
297 disabled: true,
298 selectors: BTreeMap::default(),
299 },
300 );
301
302 Self(inner)
303 }
304}
305
306#[derive(Debug, Clone, Serialize)]
311pub struct AwsPrincipalContext {
312 pub aws_account_id: String,
313 pub aws_external_id_prefix: AwsExternalIdPrefix,
314}
315
316impl AwsPrincipalContext {
317 pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
318 format!(
319 "arn:aws:iam::{}:role/mz_{}_{}",
320 self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
321 )
322 }
323}
324
325#[cfg(test)]
326#[allow(clippy::unwrap_used)]
327mod tests {
328 use super::*;
329
330 #[mz_ore::test]
331 #[cfg_attr(miri, ignore)] fn cluster_replica_size_credits_from_memory() {
333 let s = r#"{
334 "test": {
335 "memory_limit": "1000MiB",
336 "scale": 2,
337 "workers": 10,
338 "credits_per_hour": "0"
339 }
340 }"#;
341 let map = ClusterReplicaSizeMap::parse_from_str(s, true).unwrap();
342
343 let alloc = map.get_allocation_by_name("test").unwrap();
344 let expected = Numeric::from(2000) / Numeric::from(1024);
345 assert_eq!(alloc.credits_per_hour, expected);
346 }
347}