mz_catalog/durable/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::str::FromStr;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use base64::prelude::*;
17use ipnet::IpNet;
18use itertools::max;
19use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
20use mz_controller::clusters::ReplicaLogging;
21use mz_controller_types::{ClusterId, ReplicaId};
22use mz_ore::collections::HashSet;
23use mz_ore::now::EpochMillis;
24use mz_persist_types::ShardId;
25use mz_pgrepr::oid::{
26    FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
27    SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
28    SCHEMA_MZ_INTERNAL_OID, SCHEMA_MZ_INTROSPECTION_OID, SCHEMA_MZ_UNSAFE_OID,
29    SCHEMA_PG_CATALOG_OID,
30};
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
32use mz_repr::network_policy_id::NetworkPolicyId;
33use mz_repr::role_id::RoleId;
34use mz_sql::catalog::{
35    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributesRaw, RoleMembership,
36    RoleVars, SystemObjectType,
37};
38use mz_sql::names::{
39    DatabaseId, ObjectId, PUBLIC_ROLE_NAME, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
40};
41use mz_sql::plan::{NetworkPolicyRule, PolicyAddress};
42use mz_sql::rbac;
43use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
44
45use crate::builtin::BUILTIN_ROLES;
46use crate::durable::upgrade::CATALOG_VERSION;
47use crate::durable::{
48    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, BootstrapArgs,
49    CATALOG_CONTENT_VERSION_KEY, CatalogError, ClusterConfig, ClusterVariant,
50    ClusterVariantManaged, DATABASE_ID_ALLOC_KEY, DefaultPrivilege, EXPRESSION_CACHE_SHARD_KEY,
51    MOCK_AUTHENTICATION_NONCE_KEY, OID_ALLOC_KEY, ReplicaConfig, ReplicaLocation, Role,
52    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
53    SYSTEM_REPLICA_ID_ALLOC_KEY, Schema, Transaction, USER_CLUSTER_ID_ALLOC_KEY,
54    USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
55};
56
57/// The key within the "config" Collection that stores the version of the catalog.
58pub const USER_VERSION_KEY: &str = "user_version";
59/// The key within the "config" collection that stores whether the remote configuration was
60/// synchronized at least once.
61pub(crate) const SYSTEM_CONFIG_SYNCED_KEY: &str = "system_config_synced";
62
63/// The key used within the "config" collection where we store a mirror of the
64/// `with_0dt_deployment_max_wait` "system var" value. This is mirrored so that
65/// we can toggle the flag with LaunchDarkly, but use it in boot before
66/// LaunchDarkly is available.
67///
68/// NOTE: Weird prefix because we can't start with a `0`.
69pub(crate) const WITH_0DT_DEPLOYMENT_MAX_WAIT: &str = "with_0dt_deployment_max_wait";
70
71/// The key used within the "config" collection where we store a mirror of the
72/// `with_0dt_deployment_ddl_check_interval` "system var" value. This is
73/// mirrored so that we can toggle the flag with LaunchDarkly, but use it in
74/// boot before LaunchDarkly is available.
75///
76/// NOTE: Weird prefix because we can't start with a `0`.
77pub(crate) const WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL: &str =
78    "with_0dt_deployment_ddl_check_interval";
79
80/// The key used within the "config" collection where we store a mirror of the
81/// `enable_0dt_deployment_panic_after_timeout` "system var" value. This is
82/// mirrored so that we can toggle the flag with LaunchDarkly, but use it in
83/// boot before LaunchDarkly is available.
84pub(crate) const ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT: &str =
85    "enable_0dt_deployment_panic_after_timeout";
86
87const USER_ID_ALLOC_KEY: &str = "user";
88const SYSTEM_ID_ALLOC_KEY: &str = "system";
89
90const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1);
91const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart";
92
93const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1);
94
95const MATERIALIZE_DATABASE_ID_VAL: u64 = 1;
96const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL);
97
98const MZ_CATALOG_SCHEMA_ID: u64 = 1;
99const PG_CATALOG_SCHEMA_ID: u64 = 2;
100const PUBLIC_SCHEMA_ID: u64 = 3;
101const MZ_INTERNAL_SCHEMA_ID: u64 = 4;
102const INFORMATION_SCHEMA_ID: u64 = 5;
103pub const MZ_UNSAFE_SCHEMA_ID: u64 = 6;
104pub const MZ_CATALOG_UNSTABLE_SCHEMA_ID: u64 = 7;
105pub const MZ_INTROSPECTION_SCHEMA_ID: u64 = 8;
106
107const DEFAULT_ALLOCATOR_ID: u64 = 1;
108
109pub const DEFAULT_USER_NETWORK_POLICY_ID: NetworkPolicyId = NetworkPolicyId::User(1);
110pub const DEFAULT_USER_NETWORK_POLICY_NAME: &str = "default";
111pub const DEFAULT_USER_NETWORK_POLICY_RULES: &[(
112    &str,
113    mz_sql::plan::NetworkPolicyRuleAction,
114    mz_sql::plan::NetworkPolicyRuleDirection,
115    &str,
116)] = &[(
117    "open_ingress",
118    mz_sql::plan::NetworkPolicyRuleAction::Allow,
119    mz_sql::plan::NetworkPolicyRuleDirection::Ingress,
120    "0.0.0.0/0",
121)];
122
123static DEFAULT_USER_NETWORK_POLICY_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
124    vec![rbac::owner_privilege(
125        ObjectType::NetworkPolicy,
126        MZ_SYSTEM_ROLE_ID,
127    )]
128});
129
130static SYSTEM_SCHEMA_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
131    vec![
132        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Schema),
133        MzAclItem {
134            grantee: MZ_SUPPORT_ROLE_ID,
135            grantor: MZ_SYSTEM_ROLE_ID,
136            acl_mode: AclMode::USAGE,
137        },
138        rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
139    ]
140});
141
142static MZ_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
143    id: SchemaId::System(MZ_CATALOG_SCHEMA_ID),
144    oid: SCHEMA_MZ_CATALOG_OID,
145    database_id: None,
146    name: "mz_catalog".to_string(),
147    owner_id: MZ_SYSTEM_ROLE_ID,
148    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
149});
150static PG_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
151    id: SchemaId::System(PG_CATALOG_SCHEMA_ID),
152    oid: SCHEMA_PG_CATALOG_OID,
153    database_id: None,
154    name: "pg_catalog".to_string(),
155    owner_id: MZ_SYSTEM_ROLE_ID,
156    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
157});
158static MZ_INTERNAL_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
159    id: SchemaId::System(MZ_INTERNAL_SCHEMA_ID),
160    oid: SCHEMA_MZ_INTERNAL_OID,
161    database_id: None,
162    name: "mz_internal".to_string(),
163    owner_id: MZ_SYSTEM_ROLE_ID,
164    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
165});
166static INFORMATION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
167    id: SchemaId::System(INFORMATION_SCHEMA_ID),
168    oid: SCHEMA_INFORMATION_SCHEMA_OID,
169    database_id: None,
170    name: "information_schema".to_string(),
171    owner_id: MZ_SYSTEM_ROLE_ID,
172    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
173});
174static MZ_UNSAFE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
175    id: SchemaId::System(MZ_UNSAFE_SCHEMA_ID),
176    oid: SCHEMA_MZ_UNSAFE_OID,
177    database_id: None,
178    name: "mz_unsafe".to_string(),
179    owner_id: MZ_SYSTEM_ROLE_ID,
180    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
181});
182static MZ_CATALOG_UNSTABLE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
183    id: SchemaId::System(MZ_CATALOG_UNSTABLE_SCHEMA_ID),
184    oid: SCHEMA_MZ_CATALOG_UNSTABLE_OID,
185    database_id: None,
186    name: "mz_catalog_unstable".to_string(),
187    owner_id: MZ_SYSTEM_ROLE_ID,
188    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
189});
190static MZ_INTROSPECTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
191    id: SchemaId::System(MZ_INTROSPECTION_SCHEMA_ID),
192    oid: SCHEMA_MZ_INTROSPECTION_OID,
193    database_id: None,
194    name: "mz_introspection".to_string(),
195    owner_id: MZ_SYSTEM_ROLE_ID,
196    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
197});
198static SYSTEM_SCHEMAS: LazyLock<BTreeMap<&str, &Schema>> = LazyLock::new(|| {
199    [
200        &*MZ_CATALOG_SCHEMA,
201        &*PG_CATALOG_SCHEMA,
202        &*MZ_INTERNAL_SCHEMA,
203        &*INFORMATION_SCHEMA,
204        &*MZ_UNSAFE_SCHEMA,
205        &*MZ_CATALOG_UNSTABLE_SCHEMA,
206        &*MZ_INTROSPECTION_SCHEMA,
207    ]
208    .into_iter()
209    .map(|s| (&*s.name, s))
210    .collect()
211});
212
213/// Initializes the Catalog with some default objects.
214#[mz_ore::instrument]
215pub(crate) async fn initialize(
216    tx: &mut Transaction<'_>,
217    options: &BootstrapArgs,
218    initial_ts: EpochMillis,
219    catalog_content_version: String,
220) -> Result<(), CatalogError> {
221    // Collect audit events so we can commit them once at the very end.
222    let mut audit_events = vec![];
223
224    for (name, next_id) in [
225        (USER_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
226        (SYSTEM_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
227        (
228            DATABASE_ID_ALLOC_KEY.to_string(),
229            MATERIALIZE_DATABASE_ID_VAL + 1,
230        ),
231        (
232            SCHEMA_ID_ALLOC_KEY.to_string(),
233            max(&[
234                MZ_CATALOG_SCHEMA_ID,
235                PG_CATALOG_SCHEMA_ID,
236                PUBLIC_SCHEMA_ID,
237                MZ_INTERNAL_SCHEMA_ID,
238                INFORMATION_SCHEMA_ID,
239                MZ_UNSAFE_SCHEMA_ID,
240                MZ_CATALOG_UNSTABLE_SCHEMA_ID,
241                MZ_INTROSPECTION_SCHEMA_ID,
242            ])
243            .expect("known to be non-empty")
244                + 1,
245        ),
246        (USER_ROLE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
247        (
248            USER_CLUSTER_ID_ALLOC_KEY.to_string(),
249            DEFAULT_USER_CLUSTER_ID.inner_id() + 1,
250        ),
251        (
252            SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string(),
253            DEFAULT_ALLOCATOR_ID,
254        ),
255        (
256            USER_REPLICA_ID_ALLOC_KEY.to_string(),
257            DEFAULT_USER_REPLICA_ID.inner_id()
258                + u64::from(options.default_cluster_replication_factor),
259        ),
260        (
261            SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(),
262            DEFAULT_ALLOCATOR_ID,
263        ),
264        (
265            USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(),
266            DEFAULT_ALLOCATOR_ID,
267        ),
268        (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
269        (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
270        (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()),
271    ] {
272        tx.insert_id_allocator(name, next_id)?;
273    }
274
275    for role in BUILTIN_ROLES {
276        tx.insert_builtin_role(
277            role.id,
278            role.name.to_string(),
279            role.attributes.clone(),
280            RoleMembership::new(),
281            RoleVars::default(),
282            role.oid,
283        )?;
284    }
285    tx.insert_builtin_role(
286        RoleId::Public,
287        PUBLIC_ROLE_NAME.as_str().to_lowercase(),
288        RoleAttributesRaw::new(),
289        RoleMembership::new(),
290        RoleVars::default(),
291        ROLE_PUBLIC_OID,
292    )?;
293
294    // If provided, generate a new Id for the bootstrap role.
295    let bootstrap_role = if let Some(role) = &options.bootstrap_role {
296        let attributes = RoleAttributesRaw::new();
297        let membership = RoleMembership::new();
298        let vars = RoleVars::default();
299
300        let (id, oid) = tx.insert_user_role(
301            role.to_string(),
302            attributes.clone(),
303            membership.clone(),
304            vars.clone(),
305            &HashSet::new(),
306        )?;
307
308        audit_events.push((
309            mz_audit_log::EventType::Create,
310            mz_audit_log::ObjectType::Role,
311            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
312                id: id.to_string(),
313                name: role.to_string(),
314            }),
315        ));
316
317        Some(Role {
318            id,
319            name: role.to_string(),
320            attributes: attributes.into(),
321            membership,
322            vars,
323            oid,
324        })
325    } else {
326        None
327    };
328
329    let default_privileges = [
330        // mz_support needs USAGE privileges on all clusters, databases, and schemas for
331        // debugging.
332        DefaultPrivilege {
333            object: DefaultPrivilegeObject {
334                role_id: RoleId::Public,
335                database_id: None,
336                schema_id: None,
337                object_type: mz_sql::catalog::ObjectType::Cluster,
338            },
339            acl_item: DefaultPrivilegeAclItem {
340                grantee: MZ_SUPPORT_ROLE_ID,
341                acl_mode: AclMode::USAGE,
342            },
343        },
344        DefaultPrivilege {
345            object: DefaultPrivilegeObject {
346                role_id: RoleId::Public,
347                database_id: None,
348                schema_id: None,
349                object_type: mz_sql::catalog::ObjectType::Database,
350            },
351            acl_item: DefaultPrivilegeAclItem {
352                grantee: MZ_SUPPORT_ROLE_ID,
353                acl_mode: AclMode::USAGE,
354            },
355        },
356        DefaultPrivilege {
357            object: DefaultPrivilegeObject {
358                role_id: RoleId::Public,
359                database_id: None,
360                schema_id: None,
361                object_type: mz_sql::catalog::ObjectType::Schema,
362            },
363            acl_item: DefaultPrivilegeAclItem {
364                grantee: MZ_SUPPORT_ROLE_ID,
365                acl_mode: AclMode::USAGE,
366            },
367        },
368        DefaultPrivilege {
369            object: DefaultPrivilegeObject {
370                role_id: RoleId::Public,
371                database_id: None,
372                schema_id: None,
373                object_type: mz_sql::catalog::ObjectType::Type,
374            },
375            acl_item: DefaultPrivilegeAclItem {
376                grantee: RoleId::Public,
377                acl_mode: AclMode::USAGE,
378            },
379        },
380    ];
381    tx.set_default_privileges(default_privileges.to_vec())?;
382    for DefaultPrivilege { object, acl_item } in default_privileges {
383        let object_type = match object.object_type {
384            ObjectType::Table => mz_audit_log::ObjectType::Table,
385            ObjectType::View => mz_audit_log::ObjectType::View,
386            ObjectType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
387            ObjectType::Source => mz_audit_log::ObjectType::Source,
388            ObjectType::Sink => mz_audit_log::ObjectType::Sink,
389            ObjectType::Index => mz_audit_log::ObjectType::Index,
390            ObjectType::Type => mz_audit_log::ObjectType::Type,
391            ObjectType::Role => mz_audit_log::ObjectType::Role,
392            ObjectType::Cluster => mz_audit_log::ObjectType::Cluster,
393            ObjectType::ClusterReplica => mz_audit_log::ObjectType::ClusterReplica,
394            ObjectType::Secret => mz_audit_log::ObjectType::Secret,
395            ObjectType::Connection => mz_audit_log::ObjectType::Connection,
396            ObjectType::Database => mz_audit_log::ObjectType::Database,
397            ObjectType::Schema => mz_audit_log::ObjectType::Schema,
398            ObjectType::Func => mz_audit_log::ObjectType::Func,
399            ObjectType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
400            ObjectType::NetworkPolicy => mz_audit_log::ObjectType::NetworkPolicy,
401        };
402        audit_events.push((
403            mz_audit_log::EventType::Grant,
404            object_type,
405            mz_audit_log::EventDetails::AlterDefaultPrivilegeV1(
406                mz_audit_log::AlterDefaultPrivilegeV1 {
407                    role_id: object.role_id.to_string(),
408                    database_id: object.database_id.map(|id| id.to_string()),
409                    schema_id: object.schema_id.map(|id| id.to_string()),
410                    grantee_id: acl_item.grantee.to_string(),
411                    privileges: acl_item.acl_mode.to_string(),
412                },
413            ),
414        ));
415    }
416
417    let mut db_privileges = vec![
418        MzAclItem {
419            grantee: RoleId::Public,
420            grantor: MZ_SYSTEM_ROLE_ID,
421            acl_mode: AclMode::USAGE,
422        },
423        MzAclItem {
424            grantee: MZ_SUPPORT_ROLE_ID,
425            grantor: MZ_SYSTEM_ROLE_ID,
426            acl_mode: AclMode::USAGE,
427        },
428        rbac::owner_privilege(mz_sql::catalog::ObjectType::Database, MZ_SYSTEM_ROLE_ID),
429    ];
430    // Optionally add a privilege for the bootstrap role.
431    if let Some(role) = &bootstrap_role {
432        db_privileges.push(MzAclItem {
433            grantee: role.id.clone(),
434            grantor: MZ_SYSTEM_ROLE_ID,
435            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
436                mz_sql::catalog::ObjectType::Database,
437            )),
438        })
439    };
440
441    let materialize_db_oid = tx.allocate_oid(&HashSet::new())?;
442    tx.insert_database(
443        MATERIALIZE_DATABASE_ID,
444        "materialize",
445        MZ_SYSTEM_ROLE_ID,
446        db_privileges,
447        materialize_db_oid,
448    )?;
449    audit_events.extend([
450        (
451            mz_audit_log::EventType::Create,
452            mz_audit_log::ObjectType::Database,
453            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
454                id: MATERIALIZE_DATABASE_ID.to_string(),
455                name: "materialize".to_string(),
456            }),
457        ),
458        (
459            mz_audit_log::EventType::Grant,
460            mz_audit_log::ObjectType::Database,
461            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
462                object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
463                grantee_id: RoleId::Public.to_string(),
464                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
465                privileges: AclMode::USAGE.to_string(),
466            }),
467        ),
468    ]);
469    // Optionally add a privilege for the bootstrap role.
470    if let Some(role) = &bootstrap_role {
471        let role_id: RoleId = role.id.clone();
472        audit_events.push((
473            mz_audit_log::EventType::Grant,
474            mz_audit_log::ObjectType::Database,
475            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
476                object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
477                grantee_id: role_id.to_string(),
478                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
479                privileges: rbac::all_object_privileges(SystemObjectType::Object(
480                    mz_sql::catalog::ObjectType::Database,
481                ))
482                .to_string(),
483            }),
484        ));
485    }
486
487    let public_schema_oid = tx.allocate_oid(&HashSet::new())?;
488    let public_schema = Schema {
489        id: SchemaId::User(PUBLIC_SCHEMA_ID),
490        oid: public_schema_oid,
491        database_id: Some(MATERIALIZE_DATABASE_ID),
492        name: "public".to_string(),
493        owner_id: MZ_SYSTEM_ROLE_ID,
494        privileges: vec![
495            MzAclItem {
496                grantee: RoleId::Public,
497                grantor: MZ_SYSTEM_ROLE_ID,
498                acl_mode: AclMode::USAGE,
499            },
500            MzAclItem {
501                grantee: MZ_SUPPORT_ROLE_ID,
502                grantor: MZ_SYSTEM_ROLE_ID,
503                acl_mode: AclMode::USAGE,
504            },
505            rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
506        ]
507        .into_iter()
508        // Optionally add the bootstrap role to the public schema.
509        .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
510            grantee: role.id.clone(),
511            grantor: MZ_SYSTEM_ROLE_ID,
512            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
513                mz_sql::catalog::ObjectType::Schema,
514            )),
515        }))
516        .collect(),
517    };
518
519    for schema in SYSTEM_SCHEMAS.values().chain(iter::once(&&public_schema)) {
520        tx.insert_schema(
521            schema.id,
522            schema.database_id,
523            schema.name.clone(),
524            schema.owner_id,
525            schema.privileges.clone(),
526            schema.oid,
527        )?;
528    }
529    audit_events.push((
530        mz_audit_log::EventType::Create,
531        mz_audit_log::ObjectType::Schema,
532        mz_audit_log::EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
533            id: PUBLIC_SCHEMA_ID.to_string(),
534            name: "public".to_string(),
535            database_name: Some("materialize".to_string()),
536        }),
537    ));
538    if let Some(role) = &bootstrap_role {
539        let role_id: RoleId = role.id.clone();
540        audit_events.push((
541            mz_audit_log::EventType::Grant,
542            mz_audit_log::ObjectType::Schema,
543            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
544                object_id: ObjectId::Schema((
545                    ResolvedDatabaseSpecifier::Id(MATERIALIZE_DATABASE_ID),
546                    SchemaSpecifier::Id(SchemaId::User(PUBLIC_SCHEMA_ID)),
547                ))
548                .to_string(),
549                grantee_id: role_id.to_string(),
550                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
551                privileges: rbac::all_object_privileges(SystemObjectType::Object(
552                    mz_sql::catalog::ObjectType::Schema,
553                ))
554                .to_string(),
555            }),
556        ));
557    }
558
559    let mut cluster_privileges = vec![
560        MzAclItem {
561            grantee: RoleId::Public,
562            grantor: MZ_SYSTEM_ROLE_ID,
563            acl_mode: AclMode::USAGE,
564        },
565        MzAclItem {
566            grantee: MZ_SUPPORT_ROLE_ID,
567            grantor: MZ_SYSTEM_ROLE_ID,
568            acl_mode: AclMode::USAGE,
569        },
570        rbac::owner_privilege(mz_sql::catalog::ObjectType::Cluster, MZ_SYSTEM_ROLE_ID),
571    ];
572
573    // Optionally add a privilege for the bootstrap role.
574    if let Some(role) = &bootstrap_role {
575        cluster_privileges.push(MzAclItem {
576            grantee: role.id.clone(),
577            grantor: MZ_SYSTEM_ROLE_ID,
578            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
579                mz_sql::catalog::ObjectType::Cluster,
580            )),
581        });
582    };
583
584    tx.insert_network_policy(
585        DEFAULT_USER_NETWORK_POLICY_ID,
586        DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
587        DEFAULT_USER_NETWORK_POLICY_RULES
588            .into_iter()
589            .map(|(name, action, direction, ip_str)| NetworkPolicyRule {
590                name: name.to_string(),
591                action: action.clone(),
592                direction: direction.clone(),
593                address: PolicyAddress(
594                    IpNet::from_str(ip_str).expect("default policy must provide valid ip"),
595                ),
596            })
597            .collect::<Vec<NetworkPolicyRule>>(),
598        DEFAULT_USER_NETWORK_POLICY_PRIVILEGES.clone(),
599        MZ_SYSTEM_ROLE_ID,
600        NETWORK_POLICIES_DEFAULT_POLICY_OID,
601    )?;
602    // We created a network policy with a prefined ID user(1) and OID. We need
603    // to increment the id alloc key. It should be safe to assume that there's
604    // no user(1), as a sanity check, we'll assert this is the case.
605    let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
606    assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
607
608    audit_events.extend([(
609        mz_audit_log::EventType::Create,
610        mz_audit_log::ObjectType::NetworkPolicy,
611        mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
612            id: DEFAULT_USER_NETWORK_POLICY_ID.to_string(),
613            name: DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
614        }),
615    )]);
616
617    tx.insert_user_cluster(
618        DEFAULT_USER_CLUSTER_ID,
619        DEFAULT_USER_CLUSTER_NAME,
620        Vec::new(),
621        MZ_SYSTEM_ROLE_ID,
622        cluster_privileges,
623        default_cluster_config(options)?,
624        &HashSet::new(),
625    )?;
626    audit_events.extend([
627        (
628            mz_audit_log::EventType::Create,
629            mz_audit_log::ObjectType::Cluster,
630            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
631                id: DEFAULT_USER_CLUSTER_ID.to_string(),
632                name: DEFAULT_USER_CLUSTER_NAME.to_string(),
633            }),
634        ),
635        (
636            mz_audit_log::EventType::Grant,
637            mz_audit_log::ObjectType::Cluster,
638            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
639                object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
640                grantee_id: RoleId::Public.to_string(),
641                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
642                privileges: AclMode::USAGE.to_string(),
643            }),
644        ),
645    ]);
646
647    // Optionally add a privilege for the bootstrap role.
648    if let Some(role) = &bootstrap_role {
649        let role_id: RoleId = role.id.clone();
650        audit_events.push((
651            mz_audit_log::EventType::Grant,
652            mz_audit_log::ObjectType::Cluster,
653            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
654                object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
655                grantee_id: role_id.to_string(),
656                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
657                privileges: rbac::all_object_privileges(SystemObjectType::Object(
658                    mz_sql::catalog::ObjectType::Cluster,
659                ))
660                .to_string(),
661            }),
662        ));
663    }
664
665    for i in 0..options.default_cluster_replication_factor {
666        let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i));
667        let replica_name = format!("r{}", i + 1);
668        tx.insert_cluster_replica_with_id(
669            DEFAULT_USER_CLUSTER_ID,
670            replica_id,
671            &replica_name,
672            default_replica_config(options)?,
673            MZ_SYSTEM_ROLE_ID,
674        )?;
675        audit_events.push((
676            mz_audit_log::EventType::Create,
677            mz_audit_log::ObjectType::ClusterReplica,
678            mz_audit_log::EventDetails::CreateClusterReplicaV4(
679                mz_audit_log::CreateClusterReplicaV4 {
680                    cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(),
681                    cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(),
682                    replica_name,
683                    replica_id: Some(replica_id.to_string()),
684                    logical_size: options.default_cluster_replica_size.to_string(),
685                    billed_as: None,
686                    internal: false,
687                    reason: CreateOrDropClusterReplicaReasonV1::System,
688                    scheduling_policies: None,
689                },
690            ),
691        ));
692    }
693
694    let system_privileges = [MzAclItem {
695        grantee: MZ_SYSTEM_ROLE_ID,
696        grantor: MZ_SYSTEM_ROLE_ID,
697        acl_mode: rbac::all_object_privileges(SystemObjectType::System),
698    }]
699    .into_iter()
700    // Optionally add system privileges for the bootstrap role.
701    .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
702        grantee: role.id.clone(),
703        grantor: MZ_SYSTEM_ROLE_ID,
704        acl_mode: rbac::all_object_privileges(SystemObjectType::System),
705    }));
706    tx.set_system_privileges(system_privileges.clone().collect())?;
707    for system_privilege in system_privileges {
708        audit_events.push((
709            mz_audit_log::EventType::Grant,
710            mz_audit_log::ObjectType::System,
711            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
712                object_id: "SYSTEM".to_string(),
713                grantee_id: system_privilege.grantee.to_string(),
714                grantor_id: system_privilege.grantor.to_string(),
715                privileges: system_privilege.acl_mode.to_string(),
716            }),
717        ));
718    }
719
720    // Allocate an ID for each audit log event.
721    let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
722    for (ty, obj, details) in audit_events {
723        let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
724        audit_events_with_id.push((id, ty, obj, details));
725    }
726
727    for (id, event_type, object_type, details) in audit_events_with_id {
728        tx.insert_audit_log_event(VersionedEvent::V1(EventV1 {
729            id,
730            event_type,
731            object_type,
732            details,
733            user: None,
734            occurred_at: initial_ts,
735        }));
736    }
737
738    for (key, value) in [
739        (USER_VERSION_KEY.to_string(), CATALOG_VERSION),
740        (SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
741    ] {
742        tx.insert_config(key, value)?;
743    }
744
745    for (name, value) in [
746        (
747            CATALOG_CONTENT_VERSION_KEY.to_string(),
748            catalog_content_version,
749        ),
750        (
751            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
752            ShardId::new().to_string(),
753        ),
754        (
755            EXPRESSION_CACHE_SHARD_KEY.to_string(),
756            ShardId::new().to_string(),
757        ),
758    ] {
759        tx.set_setting(name, Some(value))?;
760    }
761
762    if tx
763        .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
764        .is_none()
765    {
766        let mut nonce = [0u8; 24];
767        openssl::rand::rand_bytes(&mut nonce).expect("random number generation failed");
768        tx.set_setting(
769            MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
770            Some(BASE64_STANDARD.encode(nonce)),
771        )?;
772    }
773
774    Ok(())
775}
776
777pub fn resolve_system_schema(name: &str) -> &Schema {
778    SYSTEM_SCHEMAS
779        .get(name)
780        .unwrap_or_else(|| panic!("unable to resolve system schema: {name}"))
781}
782
783/// Defines the default config for a Cluster.
784fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
785    Ok(ClusterConfig {
786        variant: ClusterVariant::Managed(ClusterVariantManaged {
787            size: args.default_cluster_replica_size.to_string(),
788            replication_factor: args.default_cluster_replication_factor,
789            availability_zones: vec![],
790            logging: ReplicaLogging {
791                log_logging: false,
792                interval: Some(Duration::from_secs(1)),
793            },
794            optimizer_feature_overrides: Default::default(),
795            schedule: Default::default(),
796        }),
797        workload_class: None,
798    })
799}
800
801/// Defines the default config for a Cluster Replica.
802fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
803    Ok(ReplicaConfig {
804        location: ReplicaLocation::Managed {
805            size: args.default_cluster_replica_size.to_string(),
806            availability_zone: None,
807            internal: false,
808            billed_as: None,
809            pending: false,
810        },
811        logging: ReplicaLogging {
812            log_logging: false,
813            interval: Some(Duration::from_secs(1)),
814        },
815    })
816}