1use std::collections::{BTreeMap, BTreeSet};
11use std::num::NonZero;
12
13use anyhow::bail;
14use bytesize::ByteSize;
15use ipnet::IpNet;
16use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
17use mz_auth::password::Password;
18use mz_build_info::BuildInfo;
19use mz_cloud_resources::AwsExternalIdPrefix;
20use mz_controller::clusters::ReplicaAllocation;
21use mz_license_keys::ValidatedLicenseKey;
22use mz_orchestrator::MemoryLimit;
23use mz_ore::cast::CastFrom;
24use mz_ore::metrics::MetricsRegistry;
25use mz_persist_client::PersistClient;
26use mz_repr::CatalogItemId;
27use mz_repr::adt::numeric::Numeric;
28use mz_sql::catalog::CatalogError as SqlCatalogError;
29use mz_sql::catalog::EnvironmentId;
30use serde::Serialize;
31
32use crate::durable::{CatalogError, DurableCatalogState};
33
34const GIB: u64 = 1024 * 1024 * 1024;
35
36#[derive(Debug)]
38pub struct Config<'a> {
39 pub storage: Box<dyn DurableCatalogState>,
41 pub metrics_registry: &'a MetricsRegistry,
43 pub state: StateConfig,
44}
45
46#[derive(Debug)]
47pub struct StateConfig {
48 pub unsafe_mode: bool,
50 pub all_features: bool,
52 pub build_info: &'static BuildInfo,
54 pub environment_id: EnvironmentId,
56 pub read_only: bool,
58 pub now: mz_ore::now::NowFn,
60 pub boot_ts: mz_repr::Timestamp,
62 pub skip_migrations: bool,
64 pub cluster_replica_sizes: ClusterReplicaSizeMap,
66 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
68 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
70 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
72 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
74 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
76 pub system_parameter_defaults: BTreeMap<String, String>,
78 pub remote_system_parameters: Option<BTreeMap<String, String>>,
81 pub availability_zones: Vec<String>,
83 pub egress_addresses: Vec<IpNet>,
85 pub aws_principal_context: Option<AwsPrincipalContext>,
87 pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
89 pub http_host_name: Option<String>,
91 pub connection_context: mz_storage_types::connections::ConnectionContext,
93 pub builtin_item_migration_config: BuiltinItemMigrationConfig,
94 pub persist_client: PersistClient,
95 pub enable_expression_cache_override: Option<bool>,
98 pub helm_chart_version: Option<String>,
100 pub external_login_password_mz_system: Option<Password>,
101 pub license_key: ValidatedLicenseKey,
102}
103
104#[derive(Debug)]
105pub struct BuiltinItemMigrationConfig {
106 pub persist_client: PersistClient,
107 pub read_only: bool,
108 pub force_migration: Option<String>,
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
113
114impl ClusterReplicaSizeMap {
115 pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
116 let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
117 serde_json::from_str(s)?;
118 if credit_consumption_from_memory {
119 for (name, replica) in cluster_replica_sizes.iter_mut() {
120 let Some(memory_limit) = replica.memory_limit else {
121 bail!("No memory limit found in cluster definition for {name}");
122 };
123 let total_memory = memory_limit.0 * replica.scale.get();
124 replica.credits_per_hour = Numeric::from(total_memory.0) / Numeric::from(GIB);
125 }
126 }
127 Ok(Self(cluster_replica_sizes))
128 }
129
130 pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
132 self.0.iter().filter(|(_, a)| !a.disabled)
133 }
134
135 pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
138 self.0.get(name).ok_or_else(|| {
139 CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
140 })
141 }
142
143 pub fn for_tests() -> Self {
148 let mut inner = (0..=5)
170 .flat_map(|i| {
171 let workers = 1 << i;
172 [
173 (format!("scale=1,workers={workers}"), None),
174 (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
175 (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
176 (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
177 (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
178 ]
179 .map(|(name, memory_limit)| {
180 (
181 name,
182 ReplicaAllocation {
183 memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
184 memory_request: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
185 cpu_limit: None,
186 cpu_request: None,
187 disk_limit: None,
188 scale: NonZero::new(1).expect("not zero"),
189 workers: NonZero::new(workers).expect("not zero"),
190 credits_per_hour: 1.into(),
191 cpu_exclusive: false,
192 is_cc: false,
193 swap_enabled: false,
194 disabled: false,
195 selectors: BTreeMap::default(),
196 },
197 )
198 })
199 })
200 .collect::<BTreeMap<_, _>>();
201
202 for i in 1..=5 {
203 let scale = 1 << i;
204 inner.insert(
205 format!("scale={scale},workers=1"),
206 ReplicaAllocation {
207 memory_limit: None,
208 memory_request: None,
209 cpu_limit: None,
210 cpu_request: None,
211 disk_limit: None,
212 scale: NonZero::new(scale).expect("not zero"),
213 workers: NonZero::new(1).expect("not zero"),
214 credits_per_hour: scale.into(),
215 cpu_exclusive: false,
216 is_cc: false,
217 swap_enabled: false,
218 disabled: false,
219 selectors: BTreeMap::default(),
220 },
221 );
222
223 inner.insert(
224 format!("scale={scale},workers={scale}"),
225 ReplicaAllocation {
226 memory_limit: None,
227 memory_request: None,
228 cpu_limit: None,
229 cpu_request: None,
230 disk_limit: None,
231 scale: NonZero::new(scale).expect("not zero"),
232 workers: NonZero::new(scale.into()).expect("not zero"),
233 credits_per_hour: scale.into(),
234 cpu_exclusive: false,
235 is_cc: false,
236 swap_enabled: false,
237 disabled: false,
238 selectors: BTreeMap::default(),
239 },
240 );
241
242 inner.insert(
243 format!("scale=1,workers=8,mem={scale}GiB"),
244 ReplicaAllocation {
245 memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
246 memory_request: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
247 cpu_limit: None,
248 cpu_request: None,
249 disk_limit: None,
250 scale: NonZero::new(1).expect("not zero"),
251 workers: NonZero::new(8).expect("not zero"),
252 credits_per_hour: 1.into(),
253 cpu_exclusive: false,
254 is_cc: false,
255 swap_enabled: false,
256 disabled: false,
257 selectors: BTreeMap::default(),
258 },
259 );
260 }
261
262 inner.insert(
263 "scale=2,workers=4".to_string(),
264 ReplicaAllocation {
265 memory_limit: None,
266 memory_request: None,
267 cpu_limit: None,
268 cpu_request: None,
269 disk_limit: None,
270 scale: NonZero::new(2).expect("not zero"),
271 workers: NonZero::new(4).expect("not zero"),
272 credits_per_hour: 2.into(),
273 cpu_exclusive: false,
274 is_cc: false,
275 swap_enabled: false,
276 disabled: false,
277 selectors: BTreeMap::default(),
278 },
279 );
280
281 inner.insert(
282 "free".to_string(),
283 ReplicaAllocation {
284 memory_limit: None,
285 memory_request: None,
286 cpu_limit: None,
287 cpu_request: None,
288 disk_limit: None,
289 scale: NonZero::new(1).expect("not zero"),
290 workers: NonZero::new(1).expect("not zero"),
291 credits_per_hour: 0.into(),
292 cpu_exclusive: false,
293 is_cc: true,
294 swap_enabled: false,
295 disabled: true,
296 selectors: BTreeMap::default(),
297 },
298 );
299
300 Self(inner)
301 }
302}
303
304#[derive(Debug, Clone, Serialize)]
309pub struct AwsPrincipalContext {
310 pub aws_account_id: String,
311 pub aws_external_id_prefix: AwsExternalIdPrefix,
312}
313
314impl AwsPrincipalContext {
315 pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
316 format!(
317 "arn:aws:iam::{}:role/mz_{}_{}",
318 self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
319 )
320 }
321}
322
323#[cfg(test)]
324#[allow(clippy::unwrap_used)]
325mod tests {
326 use super::*;
327
328 #[mz_ore::test]
329 #[cfg_attr(miri, ignore)] fn cluster_replica_size_credits_from_memory() {
331 let s = r#"{
332 "test": {
333 "memory_limit": "1000MiB",
334 "scale": 2,
335 "workers": 10,
336 "credits_per_hour": "0"
337 }
338 }"#;
339 let map = ClusterReplicaSizeMap::parse_from_str(s, true).unwrap();
340
341 let alloc = map.get_allocation_by_name("test").unwrap();
342 let expected = Numeric::from(2000) / Numeric::from(1024);
343 assert_eq!(alloc.credits_per_hour, expected);
344 }
345}