1use std::collections::{BTreeMap, BTreeSet};
11
12use anyhow::bail;
13use bytesize::ByteSize;
14use ipnet::IpNet;
15use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
16use mz_auth::password::Password;
17use mz_build_info::BuildInfo;
18use mz_cloud_resources::AwsExternalIdPrefix;
19use mz_controller::clusters::ReplicaAllocation;
20use mz_license_keys::ValidatedLicenseKey;
21use mz_orchestrator::MemoryLimit;
22use mz_ore::cast::CastFrom;
23use mz_ore::metrics::MetricsRegistry;
24use mz_persist_client::PersistClient;
25use mz_repr::CatalogItemId;
26use mz_repr::adt::numeric::Numeric;
27use mz_sql::catalog::CatalogError as SqlCatalogError;
28use mz_sql::catalog::EnvironmentId;
29use serde::Serialize;
30
31use crate::durable::{CatalogError, DurableCatalogState};
32
33const GIB: u64 = 1024 * 1024 * 1024;
34
35#[derive(Debug)]
37pub struct Config<'a> {
38 pub storage: Box<dyn DurableCatalogState>,
40 pub metrics_registry: &'a MetricsRegistry,
42 pub state: StateConfig,
43}
44
45#[derive(Debug)]
46pub struct StateConfig {
47 pub unsafe_mode: bool,
49 pub all_features: bool,
51 pub build_info: &'static BuildInfo,
53 pub environment_id: EnvironmentId,
55 pub read_only: bool,
57 pub now: mz_ore::now::NowFn,
59 pub boot_ts: mz_repr::Timestamp,
61 pub skip_migrations: bool,
63 pub cluster_replica_sizes: ClusterReplicaSizeMap,
65 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
67 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
69 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
71 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
73 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
75 pub system_parameter_defaults: BTreeMap<String, String>,
77 pub remote_system_parameters: Option<BTreeMap<String, String>>,
80 pub availability_zones: Vec<String>,
82 pub egress_addresses: Vec<IpNet>,
84 pub aws_principal_context: Option<AwsPrincipalContext>,
86 pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
88 pub http_host_name: Option<String>,
90 pub connection_context: mz_storage_types::connections::ConnectionContext,
92 pub builtin_item_migration_config: BuiltinItemMigrationConfig,
93 pub persist_client: PersistClient,
94 pub enable_expression_cache_override: Option<bool>,
97 pub helm_chart_version: Option<String>,
99 pub external_login_password_mz_system: Option<Password>,
100 pub license_key: ValidatedLicenseKey,
101}
102
103#[derive(Debug)]
104pub struct BuiltinItemMigrationConfig {
105 pub persist_client: PersistClient,
106 pub read_only: bool,
107}
108
109#[derive(Debug, Clone, Serialize)]
110pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
111
112impl ClusterReplicaSizeMap {
113 pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
114 let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
115 serde_json::from_str(s)?;
116 if credit_consumption_from_memory {
117 for (name, replica) in cluster_replica_sizes.iter_mut() {
118 let Some(memory_limit) = replica.memory_limit else {
119 bail!("No memory limit found in cluster definition for {name}");
120 };
121 replica.credits_per_hour = Numeric::from(
122 (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
123 ) / Numeric::from(1 * GIB);
124 }
125 }
126 Ok(Self(cluster_replica_sizes))
127 }
128
129 pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
131 self.0.iter().filter(|(_, a)| !a.disabled)
132 }
133
134 pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
137 self.0.get(name).ok_or_else(|| {
138 CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
139 })
140 }
141
142 pub fn for_tests() -> Self {
147 let mut inner = (0..=5)
169 .flat_map(|i| {
170 let workers: u8 = 1 << i;
171 [
172 (format!("scale=1,workers={workers}"), None),
173 (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
174 (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
175 (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
176 (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
177 ]
178 .map(|(name, memory_limit)| {
179 (
180 name,
181 ReplicaAllocation {
182 memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
183 cpu_limit: None,
184 disk_limit: None,
185 scale: 1,
186 workers: workers.into(),
187 credits_per_hour: 1.into(),
188 cpu_exclusive: false,
189 is_cc: false,
190 swap_enabled: false,
191 disabled: false,
192 selectors: BTreeMap::default(),
193 },
194 )
195 })
196 })
197 .collect::<BTreeMap<_, _>>();
198
199 for i in 1..=5 {
200 let scale = 1 << i;
201 inner.insert(
202 format!("scale={scale},workers=1"),
203 ReplicaAllocation {
204 memory_limit: None,
205 cpu_limit: None,
206 disk_limit: None,
207 scale,
208 workers: 1,
209 credits_per_hour: scale.into(),
210 cpu_exclusive: false,
211 is_cc: false,
212 swap_enabled: false,
213 disabled: false,
214 selectors: BTreeMap::default(),
215 },
216 );
217
218 inner.insert(
219 format!("scale={scale},workers={scale}"),
220 ReplicaAllocation {
221 memory_limit: None,
222 cpu_limit: None,
223 disk_limit: None,
224 scale,
225 workers: scale.into(),
226 credits_per_hour: scale.into(),
227 cpu_exclusive: false,
228 is_cc: false,
229 swap_enabled: false,
230 disabled: false,
231 selectors: BTreeMap::default(),
232 },
233 );
234
235 inner.insert(
236 format!("scale=1,workers=8,mem={scale}GiB"),
237 ReplicaAllocation {
238 memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
239 cpu_limit: None,
240 disk_limit: None,
241 scale: 1,
242 workers: 8,
243 credits_per_hour: 1.into(),
244 cpu_exclusive: false,
245 is_cc: false,
246 swap_enabled: false,
247 disabled: false,
248 selectors: BTreeMap::default(),
249 },
250 );
251 }
252
253 inner.insert(
254 "scale=2,workers=4".to_string(),
255 ReplicaAllocation {
256 memory_limit: None,
257 cpu_limit: None,
258 disk_limit: None,
259 scale: 2,
260 workers: 4,
261 credits_per_hour: 2.into(),
262 cpu_exclusive: false,
263 is_cc: false,
264 swap_enabled: false,
265 disabled: false,
266 selectors: BTreeMap::default(),
267 },
268 );
269
270 inner.insert(
271 "free".to_string(),
272 ReplicaAllocation {
273 memory_limit: None,
274 cpu_limit: None,
275 disk_limit: None,
276 scale: 0,
277 workers: 0,
278 credits_per_hour: 0.into(),
279 cpu_exclusive: false,
280 is_cc: true,
281 swap_enabled: false,
282 disabled: true,
283 selectors: BTreeMap::default(),
284 },
285 );
286
287 Self(inner)
288 }
289}
290
291#[derive(Debug, Clone, Serialize)]
296pub struct AwsPrincipalContext {
297 pub aws_account_id: String,
298 pub aws_external_id_prefix: AwsExternalIdPrefix,
299}
300
301impl AwsPrincipalContext {
302 pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
303 format!(
304 "arn:aws:iam::{}:role/mz_{}_{}",
305 self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
306 )
307 }
308}