mz_catalog/durable/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::str::FromStr;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use ipnet::IpNet;
17use itertools::max;
18use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
19use mz_controller::clusters::ReplicaLogging;
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::collections::HashSet;
22use mz_ore::now::EpochMillis;
23use mz_persist_types::ShardId;
24use mz_pgrepr::oid::{
25    FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
26    SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
27    SCHEMA_MZ_INTERNAL_OID, SCHEMA_MZ_INTROSPECTION_OID, SCHEMA_MZ_UNSAFE_OID,
28    SCHEMA_PG_CATALOG_OID,
29};
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::role_id::RoleId;
33use mz_sql::catalog::{
34    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributes, RoleMembership,
35    RoleVars, SystemObjectType,
36};
37use mz_sql::names::{
38    DatabaseId, ObjectId, PUBLIC_ROLE_NAME, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
39};
40use mz_sql::plan::{NetworkPolicyRule, PolicyAddress};
41use mz_sql::rbac;
42use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
43
44use crate::builtin::BUILTIN_ROLES;
45use crate::durable::upgrade::CATALOG_VERSION;
46use crate::durable::{
47    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, BootstrapArgs,
48    CATALOG_CONTENT_VERSION_KEY, CatalogError, ClusterConfig, ClusterVariant,
49    ClusterVariantManaged, DATABASE_ID_ALLOC_KEY, DefaultPrivilege, EXPRESSION_CACHE_SHARD_KEY,
50    OID_ALLOC_KEY, ReplicaConfig, ReplicaLocation, Role, SCHEMA_ID_ALLOC_KEY,
51    STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Schema,
52    Transaction, USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY,
53    USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
54};
55
56/// The key within the "config" Collection that stores the version of the catalog.
57pub const USER_VERSION_KEY: &str = "user_version";
58/// The key within the "config" collection that stores whether the remote configuration was
59/// synchronized at least once.
60pub(crate) const SYSTEM_CONFIG_SYNCED_KEY: &str = "system_config_synced";
61
62/// The key used within the "config" collection where we store a mirror of the
63/// `enable_0dt_deployment` "system var" value. This is mirrored so that we can
64/// toggle the flag with LaunchDarkly, but use it in boot before LaunchDarkly is
65/// available.
66pub(crate) const ENABLE_0DT_DEPLOYMENT: &str = "enable_0dt_deployment";
67
68/// The key used within the "config" collection where we store a mirror of the
69/// `with_0dt_deployment_max_wait` "system var" value. This is mirrored so that
70/// we can toggle the flag with LaunchDarkly, but use it in boot before
71/// LaunchDarkly is available.
72///
73/// NOTE: Weird prefix because we can't start with a `0`.
74pub(crate) const WITH_0DT_DEPLOYMENT_MAX_WAIT: &str = "with_0dt_deployment_max_wait";
75
76/// The key used within the "config" collection where we store a mirror of the
77/// `with_0dt_deployment_ddl_check_interval` "system var" value. This is
78/// mirrored so that we can toggle the flag with LaunchDarkly, but use it in
79/// boot before LaunchDarkly is available.
80///
81/// NOTE: Weird prefix because we can't start with a `0`.
82pub(crate) const WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL: &str =
83    "with_0dt_deployment_ddl_check_interval";
84
85/// The key used within the "config" collection where we store a mirror of the
86/// `enable_0dt_deployment_panic_after_timeout` "system var" value. This is
87/// mirrored so that we can toggle the flag with LaunchDarkly, but use it in
88/// boot before LaunchDarkly is available.
89pub(crate) const ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT: &str =
90    "enable_0dt_deployment_panic_after_timeout";
91
92const USER_ID_ALLOC_KEY: &str = "user";
93const SYSTEM_ID_ALLOC_KEY: &str = "system";
94
95const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1);
96const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart";
97
98const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1);
99
100const MATERIALIZE_DATABASE_ID_VAL: u64 = 1;
101const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL);
102
103const MZ_CATALOG_SCHEMA_ID: u64 = 1;
104const PG_CATALOG_SCHEMA_ID: u64 = 2;
105const PUBLIC_SCHEMA_ID: u64 = 3;
106const MZ_INTERNAL_SCHEMA_ID: u64 = 4;
107const INFORMATION_SCHEMA_ID: u64 = 5;
108pub const MZ_UNSAFE_SCHEMA_ID: u64 = 6;
109pub const MZ_CATALOG_UNSTABLE_SCHEMA_ID: u64 = 7;
110pub const MZ_INTROSPECTION_SCHEMA_ID: u64 = 8;
111
112const DEFAULT_ALLOCATOR_ID: u64 = 1;
113
114pub const DEFAULT_USER_NETWORK_POLICY_ID: NetworkPolicyId = NetworkPolicyId::User(1);
115pub const DEFAULT_USER_NETWORK_POLICY_NAME: &str = "default";
116pub const DEFAULT_USER_NETWORK_POLICY_RULES: &[(
117    &str,
118    mz_sql::plan::NetworkPolicyRuleAction,
119    mz_sql::plan::NetworkPolicyRuleDirection,
120    &str,
121)] = &[(
122    "open_ingress",
123    mz_sql::plan::NetworkPolicyRuleAction::Allow,
124    mz_sql::plan::NetworkPolicyRuleDirection::Ingress,
125    "0.0.0.0/0",
126)];
127
128static DEFAULT_USER_NETWORK_POLICY_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
129    vec![rbac::owner_privilege(
130        ObjectType::NetworkPolicy,
131        MZ_SYSTEM_ROLE_ID,
132    )]
133});
134
135static SYSTEM_SCHEMA_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
136    vec![
137        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Schema),
138        MzAclItem {
139            grantee: MZ_SUPPORT_ROLE_ID,
140            grantor: MZ_SYSTEM_ROLE_ID,
141            acl_mode: AclMode::USAGE,
142        },
143        rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
144    ]
145});
146
147static MZ_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
148    id: SchemaId::System(MZ_CATALOG_SCHEMA_ID),
149    oid: SCHEMA_MZ_CATALOG_OID,
150    database_id: None,
151    name: "mz_catalog".to_string(),
152    owner_id: MZ_SYSTEM_ROLE_ID,
153    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
154});
155static PG_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
156    id: SchemaId::System(PG_CATALOG_SCHEMA_ID),
157    oid: SCHEMA_PG_CATALOG_OID,
158    database_id: None,
159    name: "pg_catalog".to_string(),
160    owner_id: MZ_SYSTEM_ROLE_ID,
161    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
162});
163static MZ_INTERNAL_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
164    id: SchemaId::System(MZ_INTERNAL_SCHEMA_ID),
165    oid: SCHEMA_MZ_INTERNAL_OID,
166    database_id: None,
167    name: "mz_internal".to_string(),
168    owner_id: MZ_SYSTEM_ROLE_ID,
169    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
170});
171static INFORMATION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
172    id: SchemaId::System(INFORMATION_SCHEMA_ID),
173    oid: SCHEMA_INFORMATION_SCHEMA_OID,
174    database_id: None,
175    name: "information_schema".to_string(),
176    owner_id: MZ_SYSTEM_ROLE_ID,
177    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
178});
179static MZ_UNSAFE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
180    id: SchemaId::System(MZ_UNSAFE_SCHEMA_ID),
181    oid: SCHEMA_MZ_UNSAFE_OID,
182    database_id: None,
183    name: "mz_unsafe".to_string(),
184    owner_id: MZ_SYSTEM_ROLE_ID,
185    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
186});
187static MZ_CATALOG_UNSTABLE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
188    id: SchemaId::System(MZ_CATALOG_UNSTABLE_SCHEMA_ID),
189    oid: SCHEMA_MZ_CATALOG_UNSTABLE_OID,
190    database_id: None,
191    name: "mz_catalog_unstable".to_string(),
192    owner_id: MZ_SYSTEM_ROLE_ID,
193    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
194});
195static MZ_INTROSPECTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
196    id: SchemaId::System(MZ_INTROSPECTION_SCHEMA_ID),
197    oid: SCHEMA_MZ_INTROSPECTION_OID,
198    database_id: None,
199    name: "mz_introspection".to_string(),
200    owner_id: MZ_SYSTEM_ROLE_ID,
201    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
202});
203static SYSTEM_SCHEMAS: LazyLock<BTreeMap<&str, &Schema>> = LazyLock::new(|| {
204    [
205        &*MZ_CATALOG_SCHEMA,
206        &*PG_CATALOG_SCHEMA,
207        &*MZ_INTERNAL_SCHEMA,
208        &*INFORMATION_SCHEMA,
209        &*MZ_UNSAFE_SCHEMA,
210        &*MZ_CATALOG_UNSTABLE_SCHEMA,
211        &*MZ_INTROSPECTION_SCHEMA,
212    ]
213    .into_iter()
214    .map(|s| (&*s.name, s))
215    .collect()
216});
217
218/// Initializes the Catalog with some default objects.
219#[mz_ore::instrument]
220pub(crate) async fn initialize(
221    tx: &mut Transaction<'_>,
222    options: &BootstrapArgs,
223    initial_ts: EpochMillis,
224    catalog_content_version: String,
225) -> Result<(), CatalogError> {
226    // Collect audit events so we can commit them once at the very end.
227    let mut audit_events = vec![];
228
229    for (name, next_id) in [
230        (USER_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
231        (SYSTEM_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
232        (
233            DATABASE_ID_ALLOC_KEY.to_string(),
234            MATERIALIZE_DATABASE_ID_VAL + 1,
235        ),
236        (
237            SCHEMA_ID_ALLOC_KEY.to_string(),
238            max(&[
239                MZ_CATALOG_SCHEMA_ID,
240                PG_CATALOG_SCHEMA_ID,
241                PUBLIC_SCHEMA_ID,
242                MZ_INTERNAL_SCHEMA_ID,
243                INFORMATION_SCHEMA_ID,
244                MZ_UNSAFE_SCHEMA_ID,
245                MZ_CATALOG_UNSTABLE_SCHEMA_ID,
246                MZ_INTROSPECTION_SCHEMA_ID,
247            ])
248            .expect("known to be non-empty")
249                + 1,
250        ),
251        (USER_ROLE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
252        (
253            USER_CLUSTER_ID_ALLOC_KEY.to_string(),
254            DEFAULT_USER_CLUSTER_ID.inner_id() + 1,
255        ),
256        (
257            SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string(),
258            DEFAULT_ALLOCATOR_ID,
259        ),
260        (
261            USER_REPLICA_ID_ALLOC_KEY.to_string(),
262            DEFAULT_USER_REPLICA_ID.inner_id()
263                + u64::from(options.default_cluster_replication_factor),
264        ),
265        (
266            SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(),
267            DEFAULT_ALLOCATOR_ID,
268        ),
269        (
270            USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(),
271            DEFAULT_ALLOCATOR_ID,
272        ),
273        (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
274        (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
275        (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()),
276    ] {
277        tx.insert_id_allocator(name, next_id)?;
278    }
279
280    for role in BUILTIN_ROLES {
281        tx.insert_builtin_role(
282            role.id,
283            role.name.to_string(),
284            role.attributes.clone(),
285            RoleMembership::new(),
286            RoleVars::default(),
287            role.oid,
288        )?;
289    }
290    tx.insert_builtin_role(
291        RoleId::Public,
292        PUBLIC_ROLE_NAME.as_str().to_lowercase(),
293        RoleAttributes::new(),
294        RoleMembership::new(),
295        RoleVars::default(),
296        ROLE_PUBLIC_OID,
297    )?;
298
299    // If provided, generate a new Id for the bootstrap role.
300    let bootstrap_role = if let Some(role) = &options.bootstrap_role {
301        let attributes = RoleAttributes::new();
302        let membership = RoleMembership::new();
303        let vars = RoleVars::default();
304
305        let (id, oid) = tx.insert_user_role(
306            role.to_string(),
307            attributes.clone(),
308            membership.clone(),
309            vars.clone(),
310            &HashSet::new(),
311        )?;
312
313        audit_events.push((
314            mz_audit_log::EventType::Create,
315            mz_audit_log::ObjectType::Role,
316            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
317                id: id.to_string(),
318                name: role.to_string(),
319            }),
320        ));
321
322        Some(Role {
323            id,
324            name: role.to_string(),
325            attributes,
326            membership,
327            vars,
328            oid,
329        })
330    } else {
331        None
332    };
333
334    let default_privileges = [
335        // mz_support needs USAGE privileges on all clusters, databases, and schemas for
336        // debugging.
337        DefaultPrivilege {
338            object: DefaultPrivilegeObject {
339                role_id: RoleId::Public,
340                database_id: None,
341                schema_id: None,
342                object_type: mz_sql::catalog::ObjectType::Cluster,
343            },
344            acl_item: DefaultPrivilegeAclItem {
345                grantee: MZ_SUPPORT_ROLE_ID,
346                acl_mode: AclMode::USAGE,
347            },
348        },
349        DefaultPrivilege {
350            object: DefaultPrivilegeObject {
351                role_id: RoleId::Public,
352                database_id: None,
353                schema_id: None,
354                object_type: mz_sql::catalog::ObjectType::Database,
355            },
356            acl_item: DefaultPrivilegeAclItem {
357                grantee: MZ_SUPPORT_ROLE_ID,
358                acl_mode: AclMode::USAGE,
359            },
360        },
361        DefaultPrivilege {
362            object: DefaultPrivilegeObject {
363                role_id: RoleId::Public,
364                database_id: None,
365                schema_id: None,
366                object_type: mz_sql::catalog::ObjectType::Schema,
367            },
368            acl_item: DefaultPrivilegeAclItem {
369                grantee: MZ_SUPPORT_ROLE_ID,
370                acl_mode: AclMode::USAGE,
371            },
372        },
373        DefaultPrivilege {
374            object: DefaultPrivilegeObject {
375                role_id: RoleId::Public,
376                database_id: None,
377                schema_id: None,
378                object_type: mz_sql::catalog::ObjectType::Type,
379            },
380            acl_item: DefaultPrivilegeAclItem {
381                grantee: RoleId::Public,
382                acl_mode: AclMode::USAGE,
383            },
384        },
385    ];
386    tx.set_default_privileges(default_privileges.to_vec())?;
387    for DefaultPrivilege { object, acl_item } in default_privileges {
388        let object_type = match object.object_type {
389            ObjectType::Table => mz_audit_log::ObjectType::Table,
390            ObjectType::View => mz_audit_log::ObjectType::View,
391            ObjectType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
392            ObjectType::Source => mz_audit_log::ObjectType::Source,
393            ObjectType::Sink => mz_audit_log::ObjectType::Sink,
394            ObjectType::Index => mz_audit_log::ObjectType::Index,
395            ObjectType::Type => mz_audit_log::ObjectType::Type,
396            ObjectType::Role => mz_audit_log::ObjectType::Role,
397            ObjectType::Cluster => mz_audit_log::ObjectType::Cluster,
398            ObjectType::ClusterReplica => mz_audit_log::ObjectType::ClusterReplica,
399            ObjectType::Secret => mz_audit_log::ObjectType::Secret,
400            ObjectType::Connection => mz_audit_log::ObjectType::Connection,
401            ObjectType::Database => mz_audit_log::ObjectType::Database,
402            ObjectType::Schema => mz_audit_log::ObjectType::Schema,
403            ObjectType::Func => mz_audit_log::ObjectType::Func,
404            ObjectType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
405            ObjectType::NetworkPolicy => mz_audit_log::ObjectType::NetworkPolicy,
406        };
407        audit_events.push((
408            mz_audit_log::EventType::Grant,
409            object_type,
410            mz_audit_log::EventDetails::AlterDefaultPrivilegeV1(
411                mz_audit_log::AlterDefaultPrivilegeV1 {
412                    role_id: object.role_id.to_string(),
413                    database_id: object.database_id.map(|id| id.to_string()),
414                    schema_id: object.schema_id.map(|id| id.to_string()),
415                    grantee_id: acl_item.grantee.to_string(),
416                    privileges: acl_item.acl_mode.to_string(),
417                },
418            ),
419        ));
420    }
421
422    let mut db_privileges = vec![
423        MzAclItem {
424            grantee: RoleId::Public,
425            grantor: MZ_SYSTEM_ROLE_ID,
426            acl_mode: AclMode::USAGE,
427        },
428        MzAclItem {
429            grantee: MZ_SUPPORT_ROLE_ID,
430            grantor: MZ_SYSTEM_ROLE_ID,
431            acl_mode: AclMode::USAGE,
432        },
433        rbac::owner_privilege(mz_sql::catalog::ObjectType::Database, MZ_SYSTEM_ROLE_ID),
434    ];
435    // Optionally add a privilege for the bootstrap role.
436    if let Some(role) = &bootstrap_role {
437        db_privileges.push(MzAclItem {
438            grantee: role.id.clone(),
439            grantor: MZ_SYSTEM_ROLE_ID,
440            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
441                mz_sql::catalog::ObjectType::Database,
442            )),
443        })
444    };
445
446    let materialize_db_oid = tx.allocate_oid(&HashSet::new())?;
447    tx.insert_database(
448        MATERIALIZE_DATABASE_ID,
449        "materialize",
450        MZ_SYSTEM_ROLE_ID,
451        db_privileges,
452        materialize_db_oid,
453    )?;
454    audit_events.extend([
455        (
456            mz_audit_log::EventType::Create,
457            mz_audit_log::ObjectType::Database,
458            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
459                id: MATERIALIZE_DATABASE_ID.to_string(),
460                name: "materialize".to_string(),
461            }),
462        ),
463        (
464            mz_audit_log::EventType::Grant,
465            mz_audit_log::ObjectType::Database,
466            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
467                object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
468                grantee_id: RoleId::Public.to_string(),
469                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
470                privileges: AclMode::USAGE.to_string(),
471            }),
472        ),
473    ]);
474    // Optionally add a privilege for the bootstrap role.
475    if let Some(role) = &bootstrap_role {
476        let role_id: RoleId = role.id.clone();
477        audit_events.push((
478            mz_audit_log::EventType::Grant,
479            mz_audit_log::ObjectType::Database,
480            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
481                object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
482                grantee_id: role_id.to_string(),
483                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
484                privileges: rbac::all_object_privileges(SystemObjectType::Object(
485                    mz_sql::catalog::ObjectType::Database,
486                ))
487                .to_string(),
488            }),
489        ));
490    }
491
492    let public_schema_oid = tx.allocate_oid(&HashSet::new())?;
493    let public_schema = Schema {
494        id: SchemaId::User(PUBLIC_SCHEMA_ID),
495        oid: public_schema_oid,
496        database_id: Some(MATERIALIZE_DATABASE_ID),
497        name: "public".to_string(),
498        owner_id: MZ_SYSTEM_ROLE_ID,
499        privileges: vec![
500            MzAclItem {
501                grantee: RoleId::Public,
502                grantor: MZ_SYSTEM_ROLE_ID,
503                acl_mode: AclMode::USAGE,
504            },
505            MzAclItem {
506                grantee: MZ_SUPPORT_ROLE_ID,
507                grantor: MZ_SYSTEM_ROLE_ID,
508                acl_mode: AclMode::USAGE,
509            },
510            rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
511        ]
512        .into_iter()
513        // Optionally add the bootstrap role to the public schema.
514        .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
515            grantee: role.id.clone(),
516            grantor: MZ_SYSTEM_ROLE_ID,
517            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
518                mz_sql::catalog::ObjectType::Schema,
519            )),
520        }))
521        .collect(),
522    };
523
524    for schema in SYSTEM_SCHEMAS.values().chain(iter::once(&&public_schema)) {
525        tx.insert_schema(
526            schema.id,
527            schema.database_id,
528            schema.name.clone(),
529            schema.owner_id,
530            schema.privileges.clone(),
531            schema.oid,
532        )?;
533    }
534    audit_events.push((
535        mz_audit_log::EventType::Create,
536        mz_audit_log::ObjectType::Schema,
537        mz_audit_log::EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
538            id: PUBLIC_SCHEMA_ID.to_string(),
539            name: "public".to_string(),
540            database_name: Some("materialize".to_string()),
541        }),
542    ));
543    if let Some(role) = &bootstrap_role {
544        let role_id: RoleId = role.id.clone();
545        audit_events.push((
546            mz_audit_log::EventType::Grant,
547            mz_audit_log::ObjectType::Schema,
548            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
549                object_id: ObjectId::Schema((
550                    ResolvedDatabaseSpecifier::Id(MATERIALIZE_DATABASE_ID),
551                    SchemaSpecifier::Id(SchemaId::User(PUBLIC_SCHEMA_ID)),
552                ))
553                .to_string(),
554                grantee_id: role_id.to_string(),
555                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
556                privileges: rbac::all_object_privileges(SystemObjectType::Object(
557                    mz_sql::catalog::ObjectType::Schema,
558                ))
559                .to_string(),
560            }),
561        ));
562    }
563
564    let mut cluster_privileges = vec![
565        MzAclItem {
566            grantee: RoleId::Public,
567            grantor: MZ_SYSTEM_ROLE_ID,
568            acl_mode: AclMode::USAGE,
569        },
570        MzAclItem {
571            grantee: MZ_SUPPORT_ROLE_ID,
572            grantor: MZ_SYSTEM_ROLE_ID,
573            acl_mode: AclMode::USAGE,
574        },
575        rbac::owner_privilege(mz_sql::catalog::ObjectType::Cluster, MZ_SYSTEM_ROLE_ID),
576    ];
577
578    // Optionally add a privilege for the bootstrap role.
579    if let Some(role) = &bootstrap_role {
580        cluster_privileges.push(MzAclItem {
581            grantee: role.id.clone(),
582            grantor: MZ_SYSTEM_ROLE_ID,
583            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
584                mz_sql::catalog::ObjectType::Cluster,
585            )),
586        });
587    };
588
589    tx.insert_network_policy(
590        DEFAULT_USER_NETWORK_POLICY_ID,
591        DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
592        DEFAULT_USER_NETWORK_POLICY_RULES
593            .into_iter()
594            .map(|(name, action, direction, ip_str)| NetworkPolicyRule {
595                name: name.to_string(),
596                action: action.clone(),
597                direction: direction.clone(),
598                address: PolicyAddress(
599                    IpNet::from_str(ip_str).expect("default policy must provide valid ip"),
600                ),
601            })
602            .collect::<Vec<NetworkPolicyRule>>(),
603        DEFAULT_USER_NETWORK_POLICY_PRIVILEGES.clone(),
604        MZ_SYSTEM_ROLE_ID,
605        NETWORK_POLICIES_DEFAULT_POLICY_OID,
606    )?;
607    // We created a network policy with a prefined ID user(1) and OID. We need
608    // to increment the id alloc key. It should be safe to assume that there's
609    // no user(1), as a sanity check, we'll assert this is the case.
610    let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
611    assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
612
613    audit_events.extend([(
614        mz_audit_log::EventType::Create,
615        mz_audit_log::ObjectType::NetworkPolicy,
616        mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
617            id: DEFAULT_USER_NETWORK_POLICY_ID.to_string(),
618            name: DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
619        }),
620    )]);
621
622    tx.insert_user_cluster(
623        DEFAULT_USER_CLUSTER_ID,
624        DEFAULT_USER_CLUSTER_NAME,
625        Vec::new(),
626        MZ_SYSTEM_ROLE_ID,
627        cluster_privileges,
628        default_cluster_config(options)?,
629        &HashSet::new(),
630    )?;
631    audit_events.extend([
632        (
633            mz_audit_log::EventType::Create,
634            mz_audit_log::ObjectType::Cluster,
635            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
636                id: DEFAULT_USER_CLUSTER_ID.to_string(),
637                name: DEFAULT_USER_CLUSTER_NAME.to_string(),
638            }),
639        ),
640        (
641            mz_audit_log::EventType::Grant,
642            mz_audit_log::ObjectType::Cluster,
643            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
644                object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
645                grantee_id: RoleId::Public.to_string(),
646                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
647                privileges: AclMode::USAGE.to_string(),
648            }),
649        ),
650    ]);
651
652    // Optionally add a privilege for the bootstrap role.
653    if let Some(role) = &bootstrap_role {
654        let role_id: RoleId = role.id.clone();
655        audit_events.push((
656            mz_audit_log::EventType::Grant,
657            mz_audit_log::ObjectType::Cluster,
658            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
659                object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
660                grantee_id: role_id.to_string(),
661                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
662                privileges: rbac::all_object_privileges(SystemObjectType::Object(
663                    mz_sql::catalog::ObjectType::Cluster,
664                ))
665                .to_string(),
666            }),
667        ));
668    }
669
670    for i in 0..options.default_cluster_replication_factor {
671        let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i));
672        let replica_name = format!("r{}", i + 1);
673        tx.insert_cluster_replica_with_id(
674            DEFAULT_USER_CLUSTER_ID,
675            replica_id,
676            &replica_name,
677            default_replica_config(options)?,
678            MZ_SYSTEM_ROLE_ID,
679        )?;
680        audit_events.push((
681            mz_audit_log::EventType::Create,
682            mz_audit_log::ObjectType::ClusterReplica,
683            mz_audit_log::EventDetails::CreateClusterReplicaV2(
684                mz_audit_log::CreateClusterReplicaV2 {
685                    cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(),
686                    cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(),
687                    replica_name,
688                    replica_id: Some(replica_id.to_string()),
689                    logical_size: options.default_cluster_replica_size.to_string(),
690                    disk: {
691                        let cluster_size = options.default_cluster_replica_size.to_string();
692                        let cluster_allocation = options
693                            .cluster_replica_size_map
694                            .get_allocation_by_name(&cluster_size)?;
695                        cluster_allocation.is_cc
696                    },
697                    billed_as: None,
698                    internal: false,
699                    reason: CreateOrDropClusterReplicaReasonV1::System,
700                    scheduling_policies: None,
701                },
702            ),
703        ));
704    }
705
706    let system_privileges = [MzAclItem {
707        grantee: MZ_SYSTEM_ROLE_ID,
708        grantor: MZ_SYSTEM_ROLE_ID,
709        acl_mode: rbac::all_object_privileges(SystemObjectType::System),
710    }]
711    .into_iter()
712    // Optionally add system privileges for the bootstrap role.
713    .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
714        grantee: role.id.clone(),
715        grantor: MZ_SYSTEM_ROLE_ID,
716        acl_mode: rbac::all_object_privileges(SystemObjectType::System),
717    }));
718    tx.set_system_privileges(system_privileges.clone().collect())?;
719    for system_privilege in system_privileges {
720        audit_events.push((
721            mz_audit_log::EventType::Grant,
722            mz_audit_log::ObjectType::System,
723            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
724                object_id: "SYSTEM".to_string(),
725                grantee_id: system_privilege.grantee.to_string(),
726                grantor_id: system_privilege.grantor.to_string(),
727                privileges: system_privilege.acl_mode.to_string(),
728            }),
729        ));
730    }
731
732    // Allocate an ID for each audit log event.
733    let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
734    for (ty, obj, details) in audit_events {
735        let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
736        audit_events_with_id.push((id, ty, obj, details));
737    }
738
739    for (id, event_type, object_type, details) in audit_events_with_id {
740        tx.insert_audit_log_event(VersionedEvent::V1(EventV1 {
741            id,
742            event_type,
743            object_type,
744            details,
745            user: None,
746            occurred_at: initial_ts,
747        }));
748    }
749
750    for (key, value) in [
751        (USER_VERSION_KEY.to_string(), CATALOG_VERSION),
752        (SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
753    ] {
754        tx.insert_config(key, value)?;
755    }
756
757    for (name, value) in [
758        (
759            CATALOG_CONTENT_VERSION_KEY.to_string(),
760            catalog_content_version,
761        ),
762        (
763            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
764            ShardId::new().to_string(),
765        ),
766        (
767            EXPRESSION_CACHE_SHARD_KEY.to_string(),
768            ShardId::new().to_string(),
769        ),
770    ] {
771        tx.set_setting(name, Some(value))?;
772    }
773
774    Ok(())
775}
776
777pub fn resolve_system_schema(name: &str) -> &Schema {
778    SYSTEM_SCHEMAS
779        .get(name)
780        .unwrap_or_else(|| panic!("unable to resolve system schema: {name}"))
781}
782
783/// Defines the default config for a Cluster.
784fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
785    let cluster_size = args.default_cluster_replica_size.to_string();
786    let cluster_allocation = args
787        .cluster_replica_size_map
788        .get_allocation_by_name(&cluster_size)?;
789    Ok(ClusterConfig {
790        variant: ClusterVariant::Managed(ClusterVariantManaged {
791            size: cluster_size,
792            replication_factor: args.default_cluster_replication_factor,
793            availability_zones: vec![],
794            logging: ReplicaLogging {
795                log_logging: false,
796                interval: Some(Duration::from_secs(1)),
797            },
798            disk: cluster_allocation.is_cc,
799            optimizer_feature_overrides: Default::default(),
800            schedule: Default::default(),
801        }),
802        workload_class: None,
803    })
804}
805
806/// Defines the default config for a Cluster Replica.
807fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
808    let cluster_size = args.default_cluster_replica_size.to_string();
809    let cluster_allocation = args
810        .cluster_replica_size_map
811        .get_allocation_by_name(&cluster_size)?;
812    Ok(ReplicaConfig {
813        location: ReplicaLocation::Managed {
814            size: cluster_size,
815            availability_zone: None,
816            disk: cluster_allocation.is_cc,
817            internal: false,
818            billed_as: None,
819            pending: false,
820        },
821        logging: ReplicaLogging {
822            log_logging: false,
823            interval: Some(Duration::from_secs(1)),
824        },
825    })
826}