mz_catalog/
config.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use anyhow::bail;
13use bytesize::ByteSize;
14use ipnet::IpNet;
15use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
16use mz_auth::password::Password;
17use mz_build_info::BuildInfo;
18use mz_cloud_resources::AwsExternalIdPrefix;
19use mz_controller::clusters::ReplicaAllocation;
20use mz_license_keys::ValidatedLicenseKey;
21use mz_orchestrator::MemoryLimit;
22use mz_ore::cast::CastFrom;
23use mz_ore::metrics::MetricsRegistry;
24use mz_persist_client::PersistClient;
25use mz_repr::CatalogItemId;
26use mz_repr::adt::numeric::Numeric;
27use mz_sql::catalog::CatalogError as SqlCatalogError;
28use mz_sql::catalog::EnvironmentId;
29use serde::Serialize;
30
31use crate::durable::{CatalogError, DurableCatalogState};
32
33const GIB: u64 = 1024 * 1024 * 1024;
34
35/// Configures a catalog.
36#[derive(Debug)]
37pub struct Config<'a> {
38    /// The connection to the catalog storage.
39    pub storage: Box<dyn DurableCatalogState>,
40    /// The registry that catalog uses to report metrics.
41    pub metrics_registry: &'a MetricsRegistry,
42    pub state: StateConfig,
43}
44
45#[derive(Debug)]
46pub struct StateConfig {
47    /// Whether to enable unsafe mode.
48    pub unsafe_mode: bool,
49    /// Whether the build is a local dev build.
50    pub all_features: bool,
51    /// Information about this build of Materialize.
52    pub build_info: &'static BuildInfo,
53    /// A persistent ID associated with the environment.
54    pub environment_id: EnvironmentId,
55    /// Whether to start Materialize in read-only mode.
56    pub read_only: bool,
57    /// Function to generate wall clock now; can be mocked.
58    pub now: mz_ore::now::NowFn,
59    /// Linearizable timestamp of when this environment booted.
60    pub boot_ts: mz_repr::Timestamp,
61    /// Whether or not to skip catalog migrations.
62    pub skip_migrations: bool,
63    /// Map of strings to corresponding compute replica sizes.
64    pub cluster_replica_sizes: ClusterReplicaSizeMap,
65    /// Builtin system cluster config.
66    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
67    /// Builtin catalog server cluster config.
68    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
69    /// Builtin probe cluster config.
70    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
71    /// Builtin support cluster config.
72    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
73    /// Builtin analytics cluster config.
74    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
75    /// Dynamic defaults for system parameters.
76    pub system_parameter_defaults: BTreeMap<String, String>,
77    /// An optional map of system parameters pulled from a remote frontend.
78    /// A `None` value indicates that the initial sync was skipped.
79    pub remote_system_parameters: Option<BTreeMap<String, String>>,
80    /// Valid availability zones for replicas.
81    pub availability_zones: Vec<String>,
82    /// IP Addresses which will be used for egress.
83    pub egress_addresses: Vec<IpNet>,
84    /// Context for generating an AWS Principal.
85    pub aws_principal_context: Option<AwsPrincipalContext>,
86    /// Supported AWS PrivateLink availability zone ids.
87    pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
88    /// Host name or URL for connecting to the HTTP server of this instance.
89    pub http_host_name: Option<String>,
90    /// Context for source and sink connections.
91    pub connection_context: mz_storage_types::connections::ConnectionContext,
92    pub builtin_item_migration_config: BuiltinItemMigrationConfig,
93    pub persist_client: PersistClient,
94    /// Overrides the current value of the [`mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE`]
95    /// feature flag.
96    pub enable_expression_cache_override: Option<bool>,
97    /// Whether to enable zero-downtime deployments.
98    pub enable_0dt_deployment: bool,
99    /// Helm chart version
100    pub helm_chart_version: Option<String>,
101    pub external_login_password_mz_system: Option<Password>,
102    pub license_key: ValidatedLicenseKey,
103}
104
105#[derive(Debug)]
106pub struct BuiltinItemMigrationConfig {
107    pub persist_client: PersistClient,
108    pub read_only: bool,
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
113
114impl ClusterReplicaSizeMap {
115    pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
116        let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
117            serde_json::from_str(s)?;
118        if credit_consumption_from_memory {
119            for (name, replica) in cluster_replica_sizes.iter_mut() {
120                let Some(memory_limit) = replica.memory_limit else {
121                    bail!("No memory limit found in cluster definition for {name}");
122                };
123                replica.credits_per_hour = Numeric::from(
124                    (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
125                ) / Numeric::from(1 * GIB);
126            }
127        }
128        Ok(Self(cluster_replica_sizes))
129    }
130
131    /// Iterate all enabled (not disabled) replica allocations, with their name.
132    pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
133        self.0.iter().filter(|(_, a)| !a.disabled)
134    }
135
136    /// Get a replica allocation by size name. Returns a reference to the allocation, or an
137    /// error if the size is unknown.
138    pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
139        self.0.get(name).ok_or_else(|| {
140            CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
141        })
142    }
143
144    /// Used for testing and local purposes. This default value should not be used in production.
145    ///
146    /// Credits per hour are calculated as being equal to scale. This is not necessarily how the
147    /// value is computed in production.
148    pub fn for_tests() -> Self {
149        // {
150        //     "scale=1,workers=1": {"scale": 1, "workers": 1},
151        //     "scale=1,workers=2": {"scale": 1, "workers": 2},
152        //     "scale=1,workers=4": {"scale": 1, "workers": 4},
153        //     /// ...
154        //     "scale=1,workers=32": {"scale": 1, "workers": 32}
155        //     /// Testing with multiple processes on a single machine
156        //     "scale=2,workers=4": {"scale": 2, "workers": 4},
157        //     /// Used in mzcompose tests
158        //     "scale=2,workers=2": {"scale": 2, "workers": 2},
159        //     ...
160        //     "scale=16,workers=16": {"scale": 16, "workers": 16},
161        //     /// Used in the shared_fate cloudtest tests
162        //     "scale=2,workers=1": {"scale": 2, "workers": 1},
163        //     ...
164        //     "scale=16,workers=1": {"scale": 16, "workers": 1},
165        //     /// Used in the cloudtest tests that force OOMs
166        //     "scale=1,workers=1,mem=2GiB": { "memory_limit": 2GiB },
167        //     ...
168        //     "scale=1,workers=1,mem=16": { "memory_limit": 16GiB },
169        // }
170        let mut inner = (0..=5)
171            .flat_map(|i| {
172                let workers: u8 = 1 << i;
173                [
174                    (format!("scale=1,workers={workers}"), None),
175                    (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
176                    (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
177                    (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
178                    (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
179                ]
180                .map(|(name, memory_limit)| {
181                    (
182                        name,
183                        ReplicaAllocation {
184                            memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
185                            cpu_limit: None,
186                            disk_limit: None,
187                            scale: 1,
188                            workers: workers.into(),
189                            credits_per_hour: 1.into(),
190                            cpu_exclusive: false,
191                            is_cc: false,
192                            swap_enabled: false,
193                            disabled: false,
194                            selectors: BTreeMap::default(),
195                        },
196                    )
197                })
198            })
199            .collect::<BTreeMap<_, _>>();
200
201        for i in 1..=5 {
202            let scale = 1 << i;
203            inner.insert(
204                format!("scale={scale},workers=1"),
205                ReplicaAllocation {
206                    memory_limit: None,
207                    cpu_limit: None,
208                    disk_limit: None,
209                    scale,
210                    workers: 1,
211                    credits_per_hour: scale.into(),
212                    cpu_exclusive: false,
213                    is_cc: false,
214                    swap_enabled: false,
215                    disabled: false,
216                    selectors: BTreeMap::default(),
217                },
218            );
219
220            inner.insert(
221                format!("scale={scale},workers={scale}"),
222                ReplicaAllocation {
223                    memory_limit: None,
224                    cpu_limit: None,
225                    disk_limit: None,
226                    scale,
227                    workers: scale.into(),
228                    credits_per_hour: scale.into(),
229                    cpu_exclusive: false,
230                    is_cc: false,
231                    swap_enabled: false,
232                    disabled: false,
233                    selectors: BTreeMap::default(),
234                },
235            );
236
237            inner.insert(
238                format!("scale=1,workers=8,mem={scale}GiB"),
239                ReplicaAllocation {
240                    memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
241                    cpu_limit: None,
242                    disk_limit: None,
243                    scale: 1,
244                    workers: 8,
245                    credits_per_hour: 1.into(),
246                    cpu_exclusive: false,
247                    is_cc: false,
248                    swap_enabled: false,
249                    disabled: false,
250                    selectors: BTreeMap::default(),
251                },
252            );
253        }
254
255        inner.insert(
256            "scale=2,workers=4".to_string(),
257            ReplicaAllocation {
258                memory_limit: None,
259                cpu_limit: None,
260                disk_limit: None,
261                scale: 2,
262                workers: 4,
263                credits_per_hour: 2.into(),
264                cpu_exclusive: false,
265                is_cc: false,
266                swap_enabled: false,
267                disabled: false,
268                selectors: BTreeMap::default(),
269            },
270        );
271
272        inner.insert(
273            "free".to_string(),
274            ReplicaAllocation {
275                memory_limit: None,
276                cpu_limit: None,
277                disk_limit: None,
278                scale: 0,
279                workers: 0,
280                credits_per_hour: 0.into(),
281                cpu_exclusive: false,
282                is_cc: true,
283                swap_enabled: false,
284                disabled: true,
285                selectors: BTreeMap::default(),
286            },
287        );
288
289        Self(inner)
290    }
291}
292
293/// Context used to generate an AWS Principal.
294///
295/// In the case of AWS PrivateLink connections, Materialize will connect to the
296/// VPC endpoint as the AWS Principal generated via this context.
297#[derive(Debug, Clone, Serialize)]
298pub struct AwsPrincipalContext {
299    pub aws_account_id: String,
300    pub aws_external_id_prefix: AwsExternalIdPrefix,
301}
302
303impl AwsPrincipalContext {
304    pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
305        format!(
306            "arn:aws:iam::{}:role/mz_{}_{}",
307            self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
308        )
309    }
310}