1use std::collections::{BTreeMap, BTreeSet};
11
12use anyhow::bail;
13use bytesize::ByteSize;
14use ipnet::IpNet;
15use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
16use mz_auth::password::Password;
17use mz_build_info::BuildInfo;
18use mz_cloud_resources::AwsExternalIdPrefix;
19use mz_controller::clusters::ReplicaAllocation;
20use mz_license_keys::ValidatedLicenseKey;
21use mz_orchestrator::MemoryLimit;
22use mz_ore::cast::CastFrom;
23use mz_ore::metrics::MetricsRegistry;
24use mz_persist_client::PersistClient;
25use mz_repr::CatalogItemId;
26use mz_repr::adt::numeric::Numeric;
27use mz_sql::catalog::CatalogError as SqlCatalogError;
28use mz_sql::catalog::EnvironmentId;
29use serde::Serialize;
30
31use crate::durable::{CatalogError, DurableCatalogState};
32
33const GIB: u64 = 1024 * 1024 * 1024;
34
35#[derive(Debug)]
37pub struct Config<'a> {
38 pub storage: Box<dyn DurableCatalogState>,
40 pub metrics_registry: &'a MetricsRegistry,
42 pub state: StateConfig,
43}
44
45#[derive(Debug)]
46pub struct StateConfig {
47 pub unsafe_mode: bool,
49 pub all_features: bool,
51 pub build_info: &'static BuildInfo,
53 pub deploy_generation: u64,
55 pub environment_id: EnvironmentId,
57 pub read_only: bool,
59 pub now: mz_ore::now::NowFn,
61 pub boot_ts: mz_repr::Timestamp,
63 pub skip_migrations: bool,
65 pub cluster_replica_sizes: ClusterReplicaSizeMap,
67 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
69 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
71 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
73 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
75 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
77 pub system_parameter_defaults: BTreeMap<String, String>,
79 pub remote_system_parameters: Option<BTreeMap<String, String>>,
82 pub availability_zones: Vec<String>,
84 pub egress_addresses: Vec<IpNet>,
86 pub aws_principal_context: Option<AwsPrincipalContext>,
88 pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
90 pub http_host_name: Option<String>,
92 pub connection_context: mz_storage_types::connections::ConnectionContext,
94 pub builtin_item_migration_config: BuiltinItemMigrationConfig,
95 pub persist_client: PersistClient,
96 pub enable_expression_cache_override: Option<bool>,
99 pub helm_chart_version: Option<String>,
101 pub external_login_password_mz_system: Option<Password>,
102 pub license_key: ValidatedLicenseKey,
103}
104
105#[derive(Debug)]
106pub struct BuiltinItemMigrationConfig {
107 pub persist_client: PersistClient,
108 pub read_only: bool,
109 pub force_migration: Option<String>,
110}
111
112#[derive(Debug, Clone, Serialize)]
113pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
114
115impl ClusterReplicaSizeMap {
116 pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
117 let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
118 serde_json::from_str(s)?;
119 if credit_consumption_from_memory {
120 for (name, replica) in cluster_replica_sizes.iter_mut() {
121 let Some(memory_limit) = replica.memory_limit else {
122 bail!("No memory limit found in cluster definition for {name}");
123 };
124 replica.credits_per_hour = Numeric::from(
125 (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
126 ) / Numeric::from(1 * GIB);
127 }
128 }
129 Ok(Self(cluster_replica_sizes))
130 }
131
132 pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
134 self.0.iter().filter(|(_, a)| !a.disabled)
135 }
136
137 pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
140 self.0.get(name).ok_or_else(|| {
141 CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
142 })
143 }
144
145 pub fn for_tests() -> Self {
150 let mut inner = (0..=5)
172 .flat_map(|i| {
173 let workers: u8 = 1 << i;
174 [
175 (format!("scale=1,workers={workers}"), None),
176 (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
177 (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
178 (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
179 (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
180 ]
181 .map(|(name, memory_limit)| {
182 (
183 name,
184 ReplicaAllocation {
185 memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
186 cpu_limit: None,
187 disk_limit: None,
188 scale: 1,
189 workers: workers.into(),
190 credits_per_hour: 1.into(),
191 cpu_exclusive: false,
192 is_cc: false,
193 swap_enabled: false,
194 disabled: false,
195 selectors: BTreeMap::default(),
196 },
197 )
198 })
199 })
200 .collect::<BTreeMap<_, _>>();
201
202 for i in 1..=5 {
203 let scale = 1 << i;
204 inner.insert(
205 format!("scale={scale},workers=1"),
206 ReplicaAllocation {
207 memory_limit: None,
208 cpu_limit: None,
209 disk_limit: None,
210 scale,
211 workers: 1,
212 credits_per_hour: scale.into(),
213 cpu_exclusive: false,
214 is_cc: false,
215 swap_enabled: false,
216 disabled: false,
217 selectors: BTreeMap::default(),
218 },
219 );
220
221 inner.insert(
222 format!("scale={scale},workers={scale}"),
223 ReplicaAllocation {
224 memory_limit: None,
225 cpu_limit: None,
226 disk_limit: None,
227 scale,
228 workers: scale.into(),
229 credits_per_hour: scale.into(),
230 cpu_exclusive: false,
231 is_cc: false,
232 swap_enabled: false,
233 disabled: false,
234 selectors: BTreeMap::default(),
235 },
236 );
237
238 inner.insert(
239 format!("scale=1,workers=8,mem={scale}GiB"),
240 ReplicaAllocation {
241 memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
242 cpu_limit: None,
243 disk_limit: None,
244 scale: 1,
245 workers: 8,
246 credits_per_hour: 1.into(),
247 cpu_exclusive: false,
248 is_cc: false,
249 swap_enabled: false,
250 disabled: false,
251 selectors: BTreeMap::default(),
252 },
253 );
254 }
255
256 inner.insert(
257 "scale=2,workers=4".to_string(),
258 ReplicaAllocation {
259 memory_limit: None,
260 cpu_limit: None,
261 disk_limit: None,
262 scale: 2,
263 workers: 4,
264 credits_per_hour: 2.into(),
265 cpu_exclusive: false,
266 is_cc: false,
267 swap_enabled: false,
268 disabled: false,
269 selectors: BTreeMap::default(),
270 },
271 );
272
273 inner.insert(
274 "free".to_string(),
275 ReplicaAllocation {
276 memory_limit: None,
277 cpu_limit: None,
278 disk_limit: None,
279 scale: 0,
280 workers: 0,
281 credits_per_hour: 0.into(),
282 cpu_exclusive: false,
283 is_cc: true,
284 swap_enabled: false,
285 disabled: true,
286 selectors: BTreeMap::default(),
287 },
288 );
289
290 Self(inner)
291 }
292}
293
294#[derive(Debug, Clone, Serialize)]
299pub struct AwsPrincipalContext {
300 pub aws_account_id: String,
301 pub aws_external_id_prefix: AwsExternalIdPrefix,
302}
303
304impl AwsPrincipalContext {
305 pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
306 format!(
307 "arn:aws:iam::{}:role/mz_{}_{}",
308 self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
309 )
310 }
311}