1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;

use bytesize::ByteSize;
use mz_build_info::BuildInfo;
use mz_catalog;
use mz_cloud_resources::AwsExternalIdPrefix;
use mz_controller::clusters::ReplicaAllocation;
use mz_orchestrator::MemoryLimit;
use mz_ore::cast::CastFrom;
use mz_ore::metrics::MetricsRegistry;
use mz_repr::GlobalId;
use mz_secrets::SecretsReader;
use mz_sql::catalog::EnvironmentId;
use mz_sql::session::vars::ConnectionCounter;
use serde::{Deserialize, Serialize};

use crate::config::SystemParameterSyncConfig;

/// Configures a catalog.
#[derive(Debug)]
pub struct Config<'a> {
    /// The connection to the stash.
    pub storage: Box<dyn mz_catalog::DurableCatalogState>,
    /// Whether to enable unsafe mode.
    pub unsafe_mode: bool,
    /// Whether the build is a local dev build.
    pub all_features: bool,
    /// Information about this build of Materialize.
    pub build_info: &'static BuildInfo,
    /// A persistent ID associated with the environment.
    pub environment_id: EnvironmentId,
    /// Function to generate wall clock now; can be mocked.
    pub now: mz_ore::now::NowFn,
    /// Whether or not to skip catalog migrations.
    pub skip_migrations: bool,
    /// The registry that catalog uses to report metrics.
    pub metrics_registry: &'a MetricsRegistry,
    /// Map of strings to corresponding compute replica sizes.
    pub cluster_replica_sizes: ClusterReplicaSizeMap,
    /// Default storage cluster size. Must be a key from cluster_replica_sizes.
    pub default_storage_cluster_size: Option<String>,
    /// Dynamic defaults for system parameters.
    pub system_parameter_defaults: BTreeMap<String, String>,
    /// Valid availability zones for replicas.
    pub availability_zones: Vec<String>,
    /// A handle to a secrets manager that can only read secrets.
    pub secrets_reader: Arc<dyn SecretsReader>,
    /// IP Addresses which will be used for egress.
    pub egress_ips: Vec<Ipv4Addr>,
    /// Context for generating an AWS Principal.
    pub aws_principal_context: Option<AwsPrincipalContext>,
    /// Supported AWS PrivateLink availability zone ids.
    pub aws_privatelink_availability_zones: Option<BTreeSet<String>>,
    /// A optional frontend used to pull system parameters for initial sync in
    /// Catalog::open. A `None` value indicates that the initial sync should be
    /// skipped.
    pub system_parameter_sync_config: Option<SystemParameterSyncConfig>,
    /// How long to retain storage usage records
    pub storage_usage_retention_period: Option<Duration>,
    /// Host name or URL for connecting to the HTTP server of this instance.
    pub http_host_name: Option<String>,
    /// Needed only for migrating PG source column metadata. If `None`, will
    /// skip any migrations that require it, which will likely cause tests to
    /// fail.
    ///
    /// TODO(migration): delete in version v.51 (released in v0.49 + 1
    /// additional release)
    pub connection_context: Option<mz_storage_types::connections::ConnectionContext>,
    /// Global connection limit and count
    pub active_connection_count: Arc<std::sync::Mutex<ConnectionCounter>>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ClusterReplicaSizeMap(pub BTreeMap<String, ReplicaAllocation>);

impl ClusterReplicaSizeMap {
    /// Iterate all enabled (not disabled) replica allocations, with their name.
    pub fn enabled_allocations(&self) -> impl Iterator<Item = (&String, &ReplicaAllocation)> {
        self.0.iter().filter(|(_, a)| !a.disabled)
    }
}

impl Default for ClusterReplicaSizeMap {
    // Used for testing and local purposes. This default value should not be used in production.
    //
    // Credits per hour are calculated as being equal to scale. This is not necessarily how the
    // value is computed in production.
    fn default() -> Self {
        // {
        //     "1": {"scale": 1, "workers": 1},
        //     "2": {"scale": 1, "workers": 2},
        //     "4": {"scale": 1, "workers": 4},
        //     /// ...
        //     "32": {"scale": 1, "workers": 32}
        //     /// Testing with multiple processes on a single machine
        //     "2-4": {"scale": 2, "workers": 4},
        //     /// Used in mzcompose tests
        //     "2-2": {"scale": 2, "workers": 2},
        //     ...
        //     "16-16": {"scale": 16, "workers": 16},
        //     /// Used in the shared_fate cloudtest tests
        //     "2-1": {"scale": 2, "workers": 1},
        //     ...
        //     "16-1": {"scale": 16, "workers": 1},
        //     /// Used in the cloudtest tests that force OOMs
        //     "mem-2": { "memory_limit": 2Gb },
        //     ...
        //     "mem-16": { "memory_limit": 16Gb },
        // }
        let mut inner = (0..=5)
            .map(|i| {
                let workers: u8 = 1 << i;
                (
                    workers.to_string(),
                    ReplicaAllocation {
                        memory_limit: None,
                        cpu_limit: None,
                        disk_limit: None,
                        scale: 1,
                        workers: workers.into(),
                        credits_per_hour: 1.into(),
                        disabled: false,
                    },
                )
            })
            .collect::<BTreeMap<_, _>>();

        for i in 1..=5 {
            let scale = 1 << i;
            inner.insert(
                format!("{scale}-1"),
                ReplicaAllocation {
                    memory_limit: None,
                    cpu_limit: None,
                    disk_limit: None,
                    scale,
                    workers: 1,
                    credits_per_hour: scale.into(),
                    disabled: false,
                },
            );

            inner.insert(
                format!("{scale}-{scale}"),
                ReplicaAllocation {
                    memory_limit: None,
                    cpu_limit: None,
                    disk_limit: None,
                    scale,
                    workers: scale.into(),
                    credits_per_hour: scale.into(),
                    disabled: false,
                },
            );

            inner.insert(
                format!("mem-{scale}"),
                ReplicaAllocation {
                    memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
                    cpu_limit: None,
                    disk_limit: None,
                    scale: 1,
                    workers: 8,
                    credits_per_hour: 1.into(),
                    disabled: false,
                },
            );
        }

        inner.insert(
            "2-4".to_string(),
            ReplicaAllocation {
                memory_limit: None,
                cpu_limit: None,
                disk_limit: None,
                scale: 2,
                workers: 4,
                credits_per_hour: 2.into(),
                disabled: false,
            },
        );

        inner.insert(
            "free".to_string(),
            ReplicaAllocation {
                memory_limit: None,
                cpu_limit: None,
                disk_limit: None,
                scale: 0,
                workers: 0,
                credits_per_hour: 0.into(),
                disabled: true,
            },
        );
        Self(inner)
    }
}

/// Context used to generate an AWS Principal.
///
/// In the case of AWS PrivateLink connections, Materialize will connect to the
/// VPC endpoint as the AWS Principal generated via this context.
#[derive(Debug, Clone, Serialize)]
pub struct AwsPrincipalContext {
    pub aws_account_id: String,
    pub aws_external_id_prefix: AwsExternalIdPrefix,
}

impl AwsPrincipalContext {
    pub fn to_principal_string(&self, aws_external_id_suffix: GlobalId) -> String {
        format!(
            "arn:aws:iam::{}:role/mz_{}_{}",
            self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
        )
    }
}