Skip to main content

mz_catalog/
config.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::num::NonZero;
12
13use anyhow::bail;
14use bytesize::ByteSize;
15use ipnet::IpNet;
16use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
17use mz_auth::password::Password;
18use mz_build_info::BuildInfo;
19use mz_cloud_resources::AwsExternalIdPrefix;
20use mz_controller::clusters::ReplicaAllocation;
21use mz_license_keys::ValidatedLicenseKey;
22use mz_orchestrator::MemoryLimit;
23use mz_ore::cast::CastFrom;
24use mz_ore::metrics::MetricsRegistry;
25use mz_persist_client::PersistClient;
26use mz_repr::CatalogItemId;
27use mz_repr::adt::numeric::Numeric;
28use mz_sql::catalog::CatalogError as SqlCatalogError;
29use mz_sql::catalog::EnvironmentId;
30use serde::Serialize;
31
32use crate::durable::{CatalogError, DurableCatalogState};
33
34const GIB: u64 = 1024 * 1024 * 1024;
35
36/// Configures a catalog.
37#[derive(Debug)]
38pub struct Config<'a> {
39    /// The connection to the catalog storage.
40    pub storage: Box<dyn DurableCatalogState>,
41    /// The registry that catalog uses to report metrics.
42    pub metrics_registry: &'a MetricsRegistry,
43    pub state: StateConfig,
44}
45
46#[derive(Debug)]
47pub struct StateConfig {
48    /// Whether to enable unsafe mode.
49    pub unsafe_mode: bool,
50    /// Whether the build is a local dev build.
51    pub all_features: bool,
52    /// Information about this build of Materialize.
53    pub build_info: &'static BuildInfo,
54    /// A persistent ID associated with the environment.
55    pub environment_id: EnvironmentId,
56    /// Whether to start Materialize in read-only mode.
57    pub read_only: bool,
58    /// Function to generate wall clock now; can be mocked.
59    pub now: mz_ore::now::NowFn,
60    /// Linearizable timestamp of when this environment booted.
61    pub boot_ts: mz_repr::Timestamp,
62    /// Whether or not to skip catalog migrations.
63    pub skip_migrations: bool,
64    /// Map of strings to corresponding compute replica sizes.
65    pub cluster_replica_sizes: ClusterReplicaSizeMap,
66    /// Builtin system cluster config.
67    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
68    /// Builtin catalog server cluster config.
69    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
70    /// Builtin probe cluster config.
71    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
72    /// Builtin support cluster config.
73    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
74    /// Builtin analytics cluster config.
75    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
76    /// Dynamic defaults for system parameters.
77    pub system_parameter_defaults: BTreeMap<String, String>,
78    /// An optional map of system parameters pulled from a remote frontend.
79    /// A `None` value indicates that the initial sync was skipped.
80    pub remote_system_parameters: Option<BTreeMap<String, String>>,
81    /// Valid availability zones for replicas.
82    pub availability_zones: Vec<String>,
83    /// IP Addresses which will be used for egress.
84    pub egress_addresses: Vec<IpNet>,
85    /// Context for generating an AWS Principal.
86    pub aws_principal_context: Option<AwsPrincipalContext>,
87    /// Supported AWS PrivateLink availability zone ids.
88    pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
89    /// Host name or URL for connecting to the HTTP server of this instance.
90    pub http_host_name: Option<String>,
91    /// Context for source and sink connections.
92    pub connection_context: mz_storage_types::connections::ConnectionContext,
93    pub builtin_item_migration_config: BuiltinItemMigrationConfig,
94    pub persist_client: PersistClient,
95    /// Overrides the current value of the [`mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE`]
96    /// feature flag.
97    pub enable_expression_cache_override: Option<bool>,
98    /// Helm chart version
99    pub helm_chart_version: Option<String>,
100    pub external_login_password_mz_system: Option<Password>,
101    pub license_key: ValidatedLicenseKey,
102}
103
104#[derive(Debug)]
105pub struct BuiltinItemMigrationConfig {
106    pub persist_client: PersistClient,
107    pub read_only: bool,
108    pub force_migration: Option<String>,
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);
113
114impl ClusterReplicaSizeMap {
115    pub fn parse_from_str(s: &str, credit_consumption_from_memory: bool) -> anyhow::Result<Self> {
116        let mut cluster_replica_sizes: BTreeMap<String, ReplicaAllocation> =
117            serde_json::from_str(s)?;
118        if credit_consumption_from_memory {
119            for (name, replica) in cluster_replica_sizes.iter_mut() {
120                let Some(memory_limit) = replica.memory_limit else {
121                    bail!("No memory limit found in cluster definition for {name}");
122                };
123                let total_memory = memory_limit.0 * replica.scale.get();
124                replica.credits_per_hour = Numeric::from(total_memory.0) / Numeric::from(GIB);
125            }
126        }
127        Ok(Self(cluster_replica_sizes))
128    }
129
130    /// Iterate all enabled (not disabled) replica allocations, with their name.
131    pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
132        self.0.iter().filter(|(_, a)| !a.disabled)
133    }
134
135    /// Get a replica allocation by size name. Returns a reference to the allocation, or an
136    /// error if the size is unknown.
137    pub fn get_allocation_by_name(&self, name: &str) -> Result<&ReplicaAllocation, CatalogError> {
138        self.0.get(name).ok_or_else(|| {
139            CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(name.into()))
140        })
141    }
142
143    /// Used for testing and local purposes. This default value should not be used in production.
144    ///
145    /// Credits per hour are calculated as being equal to scale. This is not necessarily how the
146    /// value is computed in production.
147    pub fn for_tests() -> Self {
148        // {
149        //     "scale=1,workers=1": {"scale": 1, "workers": 1},
150        //     "scale=1,workers=2": {"scale": 1, "workers": 2},
151        //     "scale=1,workers=4": {"scale": 1, "workers": 4},
152        //     /// ...
153        //     "scale=1,workers=32": {"scale": 1, "workers": 32}
154        //     /// Testing with multiple processes on a single machine
155        //     "scale=2,workers=4": {"scale": 2, "workers": 4},
156        //     /// Used in mzcompose tests
157        //     "scale=2,workers=2": {"scale": 2, "workers": 2},
158        //     ...
159        //     "scale=16,workers=16": {"scale": 16, "workers": 16},
160        //     /// Used in the shared_fate cloudtest tests
161        //     "scale=2,workers=1": {"scale": 2, "workers": 1},
162        //     ...
163        //     "scale=16,workers=1": {"scale": 16, "workers": 1},
164        //     /// Used in the cloudtest tests that force OOMs
165        //     "scale=1,workers=1,mem=2GiB": { "memory_limit": 2GiB },
166        //     ...
167        //     "scale=1,workers=1,mem=16": { "memory_limit": 16GiB },
168        // }
169        let mut inner = (0..=5)
170            .flat_map(|i| {
171                let workers = 1 << i;
172                [
173                    (format!("scale=1,workers={workers}"), None),
174                    (format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
175                    (format!("scale=1,workers={workers},mem=8GiB"), Some(8)),
176                    (format!("scale=1,workers={workers},mem=16GiB"), Some(16)),
177                    (format!("scale=1,workers={workers},mem=32GiB"), Some(32)),
178                ]
179                .map(|(name, memory_limit)| {
180                    (
181                        name,
182                        ReplicaAllocation {
183                            memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
184                            memory_request: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
185                            cpu_limit: None,
186                            cpu_request: None,
187                            disk_limit: None,
188                            scale: NonZero::new(1).expect("not zero"),
189                            workers: NonZero::new(workers).expect("not zero"),
190                            credits_per_hour: 1.into(),
191                            cpu_exclusive: false,
192                            is_cc: false,
193                            swap_enabled: false,
194                            disabled: false,
195                            selectors: BTreeMap::default(),
196                        },
197                    )
198                })
199            })
200            .collect::<BTreeMap<_, _>>();
201
202        for i in 1..=5 {
203            let scale = 1 << i;
204            inner.insert(
205                format!("scale={scale},workers=1"),
206                ReplicaAllocation {
207                    memory_limit: None,
208                    memory_request: None,
209                    cpu_limit: None,
210                    cpu_request: None,
211                    disk_limit: None,
212                    scale: NonZero::new(scale).expect("not zero"),
213                    workers: NonZero::new(1).expect("not zero"),
214                    credits_per_hour: scale.into(),
215                    cpu_exclusive: false,
216                    is_cc: false,
217                    swap_enabled: false,
218                    disabled: false,
219                    selectors: BTreeMap::default(),
220                },
221            );
222
223            inner.insert(
224                format!("scale={scale},workers={scale}"),
225                ReplicaAllocation {
226                    memory_limit: None,
227                    memory_request: None,
228                    cpu_limit: None,
229                    cpu_request: None,
230                    disk_limit: None,
231                    scale: NonZero::new(scale).expect("not zero"),
232                    workers: NonZero::new(scale.into()).expect("not zero"),
233                    credits_per_hour: scale.into(),
234                    cpu_exclusive: false,
235                    is_cc: false,
236                    swap_enabled: false,
237                    disabled: false,
238                    selectors: BTreeMap::default(),
239                },
240            );
241
242            inner.insert(
243                format!("scale=1,workers=8,mem={scale}GiB"),
244                ReplicaAllocation {
245                    memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
246                    memory_request: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
247                    cpu_limit: None,
248                    cpu_request: None,
249                    disk_limit: None,
250                    scale: NonZero::new(1).expect("not zero"),
251                    workers: NonZero::new(8).expect("not zero"),
252                    credits_per_hour: 1.into(),
253                    cpu_exclusive: false,
254                    is_cc: false,
255                    swap_enabled: false,
256                    disabled: false,
257                    selectors: BTreeMap::default(),
258                },
259            );
260        }
261
262        inner.insert(
263            "scale=2,workers=4".to_string(),
264            ReplicaAllocation {
265                memory_limit: None,
266                memory_request: None,
267                cpu_limit: None,
268                cpu_request: None,
269                disk_limit: None,
270                scale: NonZero::new(2).expect("not zero"),
271                workers: NonZero::new(4).expect("not zero"),
272                credits_per_hour: 2.into(),
273                cpu_exclusive: false,
274                is_cc: false,
275                swap_enabled: false,
276                disabled: false,
277                selectors: BTreeMap::default(),
278            },
279        );
280
281        inner.insert(
282            "free".to_string(),
283            ReplicaAllocation {
284                memory_limit: None,
285                memory_request: None,
286                cpu_limit: None,
287                cpu_request: None,
288                disk_limit: None,
289                scale: NonZero::new(1).expect("not zero"),
290                workers: NonZero::new(1).expect("not zero"),
291                credits_per_hour: 0.into(),
292                cpu_exclusive: false,
293                is_cc: true,
294                swap_enabled: false,
295                disabled: true,
296                selectors: BTreeMap::default(),
297            },
298        );
299
300        Self(inner)
301    }
302}
303
304/// Context used to generate an AWS Principal.
305///
306/// In the case of AWS PrivateLink connections, Materialize will connect to the
307/// VPC endpoint as the AWS Principal generated via this context.
308#[derive(Debug, Clone, Serialize)]
309pub struct AwsPrincipalContext {
310    pub aws_account_id: String,
311    pub aws_external_id_prefix: AwsExternalIdPrefix,
312}
313
314impl AwsPrincipalContext {
315    pub fn to_principal_string(&self, aws_external_id_suffix: CatalogItemId) -> String {
316        format!(
317            "arn:aws:iam::{}:role/mz_{}_{}",
318            self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
319        )
320    }
321}
322
323#[cfg(test)]
324#[allow(clippy::unwrap_used)]
325mod tests {
326    use super::*;
327
328    #[mz_ore::test]
329    #[cfg_attr(miri, ignore)] // can't call foreign function `decContextDefault`
330    fn cluster_replica_size_credits_from_memory() {
331        let s = r#"{
332            "test": {
333                "memory_limit": "1000MiB",
334                "scale": 2,
335                "workers": 10,
336                "credits_per_hour": "0"
337            }
338        }"#;
339        let map = ClusterReplicaSizeMap::parse_from_str(s, true).unwrap();
340
341        let alloc = map.get_allocation_by_name("test").unwrap();
342        let expected = Numeric::from(2000) / Numeric::from(1024);
343        assert_eq!(alloc.credits_per_hour, expected);
344    }
345}