mz_catalog/
config.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use anyhow::bail;
13use bytesize::ByteSize;
14use ipnet::IpNet;
15use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
16use mz_build_info::BuildInfo;
17use mz_cloud_resources::AwsExternalIdPrefix;
18use mz_controller::clusters::ReplicaAllocation;
19use mz_orchestrator::MemoryLimit;
20use mz_ore::cast::CastFrom;
21use mz_ore::metrics::MetricsRegistry;
22use mz_persist_client::PersistClient;
23use mz_repr::CatalogItemId;
24use mz_repr::adt::numeric::Numeric;
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use mz_sql::catalog::EnvironmentId;
27use serde::Serialize;
28
29use crate::durable::{CatalogError, DurableCatalogState};
30
31const GIB: u64 = 1024 * 1024 * 1024;
32
33/// Configures a catalog.
34#[derive(Debug)]
35pub struct Config<'a> {
36    /// The connection to the catalog storage.
37    pub storage: Box<dyn DurableCatalogState>,
38    /// The registry that catalog uses to report metrics.
39    pub metrics_registry: &'a MetricsRegistry,
40    pub state: StateConfig,
41}
42
43#[derive(Debug)]
44pub struct StateConfig {
45    /// Whether to enable unsafe mode.
46    pub unsafe_mode: bool,
47    /// Whether the build is a local dev build.
48    pub all_features: bool,
49    /// Information about this build of Materialize.
50    pub build_info: &'static BuildInfo,
51    /// A persistent ID associated with the environment.
52    pub environment_id: EnvironmentId,
53    /// Whether to start Materialize in read-only mode.
54    pub read_only: bool,
55    /// Function to generate wall clock now; can be mocked.
56    pub now: mz_ore::now::NowFn,
57    /// Linearizable timestamp of when this environment booted.
58    pub boot_ts: mz_repr::Timestamp,
59    /// Whether or not to skip catalog migrations.
60    pub skip_migrations: bool,
61    /// Map of strings to corresponding compute replica sizes.
62    pub cluster_replica_sizes: ClusterReplicaSizeMap,
63    /// Builtin system cluster config.
64    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
65    /// Builtin catalog server cluster config.
66    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
67    /// Builtin probe cluster config.
68    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
69    /// Builtin support cluster config.
70    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
71    /// Builtin analytics cluster config.
72    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
73    /// Dynamic defaults for system parameters.
74    pub system_parameter_defaults: BTreeMap<String, String>,
75    /// An optional map of system parameters pulled from a remote frontend.
76    /// A `None` value indicates that the initial sync was skipped.
77    pub remote_system_parameters: Option<BTreeMap<String, String>>,
78    /// Valid availability zones for replicas.
79    pub availability_zones: Vec<String>,
80    /// IP Addresses which will be used for egress.
81    pub egress_addresses: Vec<IpNet>,
82    /// Context for generating an AWS Principal.
83    pub aws_principal_context: Option<AwsPrincipalContext>,
84    /// Supported AWS PrivateLink availability zone ids.
85    pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
86    /// Host name or URL for connecting to the HTTP server of this instance.
87    pub http_host_name: Option<String>,
88    /// Context for source and sink connections.
89    pub connection_context: mz_storage_types::connections::ConnectionContext,
90    pub builtin_item_migration_config: BuiltinItemMigrationConfig,
91    pub persist_client: PersistClient,
92    /// Overrides the current value of the [`mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE`]
93    /// feature flag.
94    pub enable_expression_cache_override: Option<bool>,
95    /// Whether to enable zero-downtime deployments.
96    pub enable_0dt_deployment: bool,
97    /// Helm chart version
98    pub helm_chart_version: Option<String>,
99}
100
101#[derive(Debug)]
102pub struct BuiltinItemMigrationConfig {
103    pub persist_client: PersistClient,
104    pub read_only: bool,
105}
106
107#[derive(Debug, Clone, Serialize)]
108pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
109
110impl ClusterReplicaSizeMap {
111    pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
112        let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
113            serde_json::from_str(s)?;
114        if credit_consumption_from_memory {
115            for (name, replica) in cluster_replica_sizes.iter_mut() {
116                let Some(memory_limit) = replica.memory_limit else {
117                    bail!("No memory limit found in cluster definition for {name}");
118                };
119                replica.credits_per_hour = Numeric::from(
120                    (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
121                ) / Numeric::from(1 * GIB);
122            }
123        }
124        Ok(Self(cluster_replica_sizes))
125    }
126
127    /// Iterate all enabled (not disabled) replica allocations, with their name.
128    pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
129        self.0.iter().filter(|(_, a)| !a.disabled)
130    }
131
132    /// Get a replica allocation by size name. Returns a reference to the allocation, or an
133    /// error if the size is unknown.
134    pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
135        self.0.get(name).ok_or_else(|| {
136            CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
137        })
138    }
139
140    /// Used for testing and local purposes. This default value should not be used in production.
141    ///
142    /// Credits per hour are calculated as being equal to scale. This is not necessarily how the
143    /// value is computed in production.
144    pub fn for_tests() -> Self {
145        // {
146        //     "1": {"scale": 1, "workers": 1},
147        //     "2": {"scale": 1, "workers": 2},
148        //     "4": {"scale": 1, "workers": 4},
149        //     /// ...
150        //     "32": {"scale": 1, "workers": 32}
151        //     /// Testing with multiple processes on a single machine
152        //     "2-4": {"scale": 2, "workers": 4},
153        //     /// Used in mzcompose tests
154        //     "2-2": {"scale": 2, "workers": 2},
155        //     ...
156        //     "16-16": {"scale": 16, "workers": 16},
157        //     /// Used in the shared_fate cloudtest tests
158        //     "2-1": {"scale": 2, "workers": 1},
159        //     ...
160        //     "16-1": {"scale": 16, "workers": 1},
161        //     /// Used in the cloudtest tests that force OOMs
162        //     "mem-2": { "memory_limit": 2Gb },
163        //     ...
164        //     "mem-16": { "memory_limit": 16Gb },
165        // }
166        let mut inner = (0..=5)
167            .flat_map(|i| {
168                let workers: u8 = 1 << i;
169                [
170                    (workers.to_string(), None),
171                    (format!("{workers}-4G"), Some(4)),
172                    (format!("{workers}-8G"), Some(8)),
173                    (format!("{workers}-16G"), Some(16)),
174                    (format!("{workers}-32G"), Some(32)),
175                ]
176                .map(|(name, memory_limit)| {
177                    (
178                        name,
179                        ReplicaAllocation {
180                            memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
181                            cpu_limit: None,
182                            disk_limit: None,
183                            scale: 1,
184                            workers: workers.into(),
185                            credits_per_hour: 1.into(),
186                            cpu_exclusive: false,
187                            is_cc: false,
188                            disabled: false,
189                            selectors: BTreeMap::default(),
190                        },
191                    )
192                })
193            })
194            .collect::<BTreeMap<_, _>>();
195
196        for i in 1..=5 {
197            let scale = 1 << i;
198            inner.insert(
199                format!("{scale}-1"),
200                ReplicaAllocation {
201                    memory_limit: None,
202                    cpu_limit: None,
203                    disk_limit: None,
204                    scale,
205                    workers: 1,
206                    credits_per_hour: scale.into(),
207                    cpu_exclusive: false,
208                    is_cc: false,
209                    disabled: false,
210                    selectors: BTreeMap::default(),
211                },
212            );
213
214            inner.insert(
215                format!("{scale}-{scale}"),
216                ReplicaAllocation {
217                    memory_limit: None,
218                    cpu_limit: None,
219                    disk_limit: None,
220                    scale,
221                    workers: scale.into(),
222                    credits_per_hour: scale.into(),
223                    cpu_exclusive: false,
224                    is_cc: false,
225                    disabled: false,
226                    selectors: BTreeMap::default(),
227                },
228            );
229
230            inner.insert(
231                format!("mem-{scale}"),
232                ReplicaAllocation {
233                    memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
234                    cpu_limit: None,
235                    disk_limit: None,
236                    scale: 1,
237                    workers: 8,
238                    credits_per_hour: 1.into(),
239                    cpu_exclusive: false,
240                    is_cc: false,
241                    disabled: false,
242                    selectors: BTreeMap::default(),
243                },
244            );
245        }
246
247        inner.insert(
248            "2-4".to_string(),
249            ReplicaAllocation {
250                memory_limit: None,
251                cpu_limit: None,
252                disk_limit: None,
253                scale: 2,
254                workers: 4,
255                credits_per_hour: 2.into(),
256                cpu_exclusive: false,
257                is_cc: false,
258                disabled: false,
259                selectors: BTreeMap::default(),
260            },
261        );
262
263        inner.insert(
264            "free".to_string(),
265            ReplicaAllocation {
266                memory_limit: None,
267                cpu_limit: None,
268                disk_limit: None,
269                scale: 0,
270                workers: 0,
271                credits_per_hour: 0.into(),
272                cpu_exclusive: false,
273                is_cc: true,
274                disabled: true,
275                selectors: BTreeMap::default(),
276            },
277        );
278
279        for size in ["1cc", "1C"] {
280            inner.insert(
281                size.to_string(),
282                ReplicaAllocation {
283                    memory_limit: None,
284                    cpu_limit: None,
285                    disk_limit: None,
286                    scale: 1,
287                    workers: 1,
288                    credits_per_hour: 1.into(),
289                    cpu_exclusive: false,
290                    is_cc: true,
291                    disabled: false,
292                    selectors: BTreeMap::default(),
293                },
294            );
295        }
296
297        Self(inner)
298    }
299}
300
301/// Context used to generate an AWS Principal.
302///
303/// In the case of AWS PrivateLink connections, Materialize will connect to the
304/// VPC endpoint as the AWS Principal generated via this context.
305#[derive(Debug, Clone, Serialize)]
306pub struct AwsPrincipalContext {
307    pub aws_account_id: String,
308    pub aws_external_id_prefix: AwsExternalIdPrefix,
309}
310
311impl AwsPrincipalContext {
312    pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
313        format!(
314            "arn:aws:iam::{}:role/mz_{}_{}",
315            self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
316        )
317    }
318}