1use std::collections::{BTreeMap, BTreeSet};
11
12use anyhow::bail;
13use bytesize::ByteSize;
14use ipnet::IpNet;
15use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
16use mz_build_info::BuildInfo;
17use mz_cloud_resources::AwsExternalIdPrefix;
18use mz_controller::clusters::ReplicaAllocation;
19use mz_orchestrator::MemoryLimit;
20use mz_ore::cast::CastFrom;
21use mz_ore::metrics::MetricsRegistry;
22use mz_persist_client::PersistClient;
23use mz_repr::CatalogItemId;
24use mz_repr::adt::numeric::Numeric;
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use mz_sql::catalog::EnvironmentId;
27use serde::Serialize;
28
29use crate::durable::{CatalogError, DurableCatalogState};
30
31const GIB: u64 = 1024 * 1024 * 1024;
32
33#[derive(Debug)]
35pub struct Config<'a> {
36 pub storage: Box<dyn DurableCatalogState>,
38 pub metrics_registry: &'a MetricsRegistry,
40 pub state: StateConfig,
41}
42
43#[derive(Debug)]
44pub struct StateConfig {
45 pub unsafe_mode: bool,
47 pub all_features: bool,
49 pub build_info: &'static BuildInfo,
51 pub environment_id: EnvironmentId,
53 pub read_only: bool,
55 pub now: mz_ore::now::NowFn,
57 pub boot_ts: mz_repr::Timestamp,
59 pub skip_migrations: bool,
61 pub cluster_replica_sizes: ClusterReplicaSizeMap,
63 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
65 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
67 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
69 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
71 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
73 pub system_parameter_defaults: BTreeMap<String, String>,
75 pub remote_system_parameters: Option<BTreeMap<String, String>>,
78 pub availability_zones: Vec<String>,
80 pub egress_addresses: Vec<IpNet>,
82 pub aws_principal_context: Option<AwsPrincipalContext>,
84 pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
86 pub http_host_name: Option<String>,
88 pub connection_context: mz_storage_types::connections::ConnectionContext,
90 pub builtin_item_migration_config: BuiltinItemMigrationConfig,
91 pub persist_client: PersistClient,
92 pub enable_expression_cache_override: Option<bool>,
95 pub enable_0dt_deployment: bool,
97 pub helm_chart_version: Option<String>,
99}
100
101#[derive(Debug)]
102pub struct BuiltinItemMigrationConfig {
103 pub persist_client: PersistClient,
104 pub read_only: bool,
105}
106
107#[derive(Debug, Clone, Serialize)]
108pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
109
110impl ClusterReplicaSizeMap {
111 pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
112 let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
113 serde_json::from_str(s)?;
114 if credit_consumption_from_memory {
115 for (name, replica) in cluster_replica_sizes.iter_mut() {
116 let Some(memory_limit) = replica.memory_limit else {
117 bail!("No memory limit found in cluster definition for {name}");
118 };
119 replica.credits_per_hour = Numeric::from(
120 (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
121 ) / Numeric::from(1 * GIB);
122 }
123 }
124 Ok(Self(cluster_replica_sizes))
125 }
126
127 pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
129 self.0.iter().filter(|(_, a)| !a.disabled)
130 }
131
132 pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
135 self.0.get(name).ok_or_else(|| {
136 CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
137 })
138 }
139
140 pub fn for_tests() -> Self {
145 let mut inner = (0..=5)
167 .flat_map(|i| {
168 let workers: u8 = 1 << i;
169 [
170 (workers.to_string(), None),
171 (format!("{workers}-4G"), Some(4)),
172 (format!("{workers}-8G"), Some(8)),
173 (format!("{workers}-16G"), Some(16)),
174 (format!("{workers}-32G"), Some(32)),
175 ]
176 .map(|(name, memory_limit)| {
177 (
178 name,
179 ReplicaAllocation {
180 memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
181 cpu_limit: None,
182 disk_limit: None,
183 scale: 1,
184 workers: workers.into(),
185 credits_per_hour: 1.into(),
186 cpu_exclusive: false,
187 is_cc: false,
188 disabled: false,
189 selectors: BTreeMap::default(),
190 },
191 )
192 })
193 })
194 .collect::<BTreeMap<_, _>>();
195
196 for i in 1..=5 {
197 let scale = 1 << i;
198 inner.insert(
199 format!("{scale}-1"),
200 ReplicaAllocation {
201 memory_limit: None,
202 cpu_limit: None,
203 disk_limit: None,
204 scale,
205 workers: 1,
206 credits_per_hour: scale.into(),
207 cpu_exclusive: false,
208 is_cc: false,
209 disabled: false,
210 selectors: BTreeMap::default(),
211 },
212 );
213
214 inner.insert(
215 format!("{scale}-{scale}"),
216 ReplicaAllocation {
217 memory_limit: None,
218 cpu_limit: None,
219 disk_limit: None,
220 scale,
221 workers: scale.into(),
222 credits_per_hour: scale.into(),
223 cpu_exclusive: false,
224 is_cc: false,
225 disabled: false,
226 selectors: BTreeMap::default(),
227 },
228 );
229
230 inner.insert(
231 format!("mem-{scale}"),
232 ReplicaAllocation {
233 memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
234 cpu_limit: None,
235 disk_limit: None,
236 scale: 1,
237 workers: 8,
238 credits_per_hour: 1.into(),
239 cpu_exclusive: false,
240 is_cc: false,
241 disabled: false,
242 selectors: BTreeMap::default(),
243 },
244 );
245 }
246
247 inner.insert(
248 "2-4".to_string(),
249 ReplicaAllocation {
250 memory_limit: None,
251 cpu_limit: None,
252 disk_limit: None,
253 scale: 2,
254 workers: 4,
255 credits_per_hour: 2.into(),
256 cpu_exclusive: false,
257 is_cc: false,
258 disabled: false,
259 selectors: BTreeMap::default(),
260 },
261 );
262
263 inner.insert(
264 "free".to_string(),
265 ReplicaAllocation {
266 memory_limit: None,
267 cpu_limit: None,
268 disk_limit: None,
269 scale: 0,
270 workers: 0,
271 credits_per_hour: 0.into(),
272 cpu_exclusive: false,
273 is_cc: true,
274 disabled: true,
275 selectors: BTreeMap::default(),
276 },
277 );
278
279 for size in ["1cc", "1C"] {
280 inner.insert(
281 size.to_string(),
282 ReplicaAllocation {
283 memory_limit: None,
284 cpu_limit: None,
285 disk_limit: None,
286 scale: 1,
287 workers: 1,
288 credits_per_hour: 1.into(),
289 cpu_exclusive: false,
290 is_cc: true,
291 disabled: false,
292 selectors: BTreeMap::default(),
293 },
294 );
295 }
296
297 Self(inner)
298 }
299}
300
301#[derive(Debug, Clone, Serialize)]
306pub struct AwsPrincipalContext {
307 pub aws_account_id: String,
308 pub aws_external_id_prefix: AwsExternalIdPrefix,
309}
310
311impl AwsPrincipalContext {
312 pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
313 format!(
314 "arn:aws:iam::{}:role/mz_{}_{}",
315 self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
316 )
317 }
318}