Skip to main content

mz_catalog/durable/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::str::FromStr;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use base64::prelude::*;
17use ipnet::IpNet;
18use itertools::max;
19use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
20use mz_controller::clusters::ReplicaLogging;
21use mz_controller_types::{ClusterId, ReplicaId};
22use mz_ore::collections::HashSet;
23use mz_ore::now::EpochMillis;
24use mz_persist_types::ShardId;
25use mz_pgrepr::oid::{
26    FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
27    SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
28    SCHEMA_MZ_INTERNAL_OID, SCHEMA_MZ_INTROSPECTION_OID, SCHEMA_MZ_UNSAFE_OID,
29    SCHEMA_PG_CATALOG_OID,
30};
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
32use mz_repr::network_policy_id::NetworkPolicyId;
33use mz_repr::role_id::RoleId;
34use mz_sql::catalog::{
35    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributesRaw, RoleMembership,
36    RoleVars, SystemObjectType,
37};
38use mz_sql::names::{
39    DatabaseId, ObjectId, PUBLIC_ROLE_NAME, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
40};
41use mz_sql::plan::{NetworkPolicyRule, PolicyAddress};
42use mz_sql::rbac;
43use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
44
45use crate::builtin::BUILTIN_ROLES;
46use crate::durable::upgrade::CATALOG_VERSION;
47use crate::durable::{
48    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, BootstrapArgs,
49    CATALOG_CONTENT_VERSION_KEY, CatalogError, ClusterConfig, ClusterVariant,
50    ClusterVariantManaged, DATABASE_ID_ALLOC_KEY, DefaultPrivilege, EXPRESSION_CACHE_SHARD_KEY,
51    MOCK_AUTHENTICATION_NONCE_KEY, OID_ALLOC_KEY, ReplicaConfig, ReplicaLocation, Role,
52    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
53    SYSTEM_REPLICA_ID_ALLOC_KEY, Schema, Transaction, USER_CLUSTER_ID_ALLOC_KEY,
54    USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
55};
56
57/// The key within the "config" Collection that stores the version of the catalog.
58pub const USER_VERSION_KEY: &str = "user_version";
59/// The key within the "config" collection that stores whether the remote configuration was
60/// synchronized at least once.
61pub(crate) const SYSTEM_CONFIG_SYNCED_KEY: &str = "system_config_synced";
62
63/// The key used within the "config" collection where we store a mirror of the
64/// `with_0dt_deployment_max_wait` "system var" value. This is mirrored so that
65/// we can toggle the flag with LaunchDarkly, but use it in boot before
66/// LaunchDarkly is available.
67///
68/// NOTE: Weird prefix because we can't start with a `0`.
69pub(crate) const WITH_0DT_DEPLOYMENT_MAX_WAIT: &str = "with_0dt_deployment_max_wait";
70
71/// The key used within the "config" collection where we store a mirror of the
72/// `with_0dt_deployment_ddl_check_interval` "system var" value. This is
73/// mirrored so that we can toggle the flag with LaunchDarkly, but use it in
74/// boot before LaunchDarkly is available.
75///
76/// NOTE: Weird prefix because we can't start with a `0`.
77pub(crate) const WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL: &str =
78    "with_0dt_deployment_ddl_check_interval";
79
80/// The key used within the "config" collection where we store a mirror of the
81/// `enable_0dt_deployment_panic_after_timeout` "system var" value. This is
82/// mirrored so that we can toggle the flag with LaunchDarkly, but use it in
83/// boot before LaunchDarkly is available.
84pub(crate) const ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT: &str =
85    "enable_0dt_deployment_panic_after_timeout";
86
87const USER_ID_ALLOC_KEY: &str = "user";
88const SYSTEM_ID_ALLOC_KEY: &str = "system";
89
90const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1);
91const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart";
92
93const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1);
94
95const MATERIALIZE_DATABASE_ID_VAL: u64 = 1;
96const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL);
97
98const MZ_CATALOG_SCHEMA_ID: u64 = 1;
99const PG_CATALOG_SCHEMA_ID: u64 = 2;
100const PUBLIC_SCHEMA_ID: u64 = 3;
101const MZ_INTERNAL_SCHEMA_ID: u64 = 4;
102const INFORMATION_SCHEMA_ID: u64 = 5;
103pub const MZ_UNSAFE_SCHEMA_ID: u64 = 6;
104pub const MZ_CATALOG_UNSTABLE_SCHEMA_ID: u64 = 7;
105pub const MZ_INTROSPECTION_SCHEMA_ID: u64 = 8;
106
107const DEFAULT_ALLOCATOR_ID: u64 = 1;
108
109pub const DEFAULT_USER_NETWORK_POLICY_ID: NetworkPolicyId = NetworkPolicyId::User(1);
110pub const DEFAULT_USER_NETWORK_POLICY_NAME: &str = "default";
111pub const DEFAULT_USER_NETWORK_POLICY_RULES: &[(
112    &str,
113    mz_sql::plan::NetworkPolicyRuleAction,
114    mz_sql::plan::NetworkPolicyRuleDirection,
115    &str,
116)] = &[(
117    "open_ingress",
118    mz_sql::plan::NetworkPolicyRuleAction::Allow,
119    mz_sql::plan::NetworkPolicyRuleDirection::Ingress,
120    "0.0.0.0/0",
121)];
122
123static DEFAULT_USER_NETWORK_POLICY_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
124    vec![rbac::owner_privilege(
125        ObjectType::NetworkPolicy,
126        MZ_SYSTEM_ROLE_ID,
127    )]
128});
129
130static SYSTEM_SCHEMA_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
131    vec![
132        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Schema),
133        MzAclItem {
134            grantee: MZ_SUPPORT_ROLE_ID,
135            grantor: MZ_SYSTEM_ROLE_ID,
136            acl_mode: AclMode::USAGE,
137        },
138        rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
139    ]
140});
141
142static MZ_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
143    id: SchemaId::System(MZ_CATALOG_SCHEMA_ID),
144    oid: SCHEMA_MZ_CATALOG_OID,
145    database_id: None,
146    name: "mz_catalog".to_string(),
147    owner_id: MZ_SYSTEM_ROLE_ID,
148    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
149});
150static PG_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
151    id: SchemaId::System(PG_CATALOG_SCHEMA_ID),
152    oid: SCHEMA_PG_CATALOG_OID,
153    database_id: None,
154    name: "pg_catalog".to_string(),
155    owner_id: MZ_SYSTEM_ROLE_ID,
156    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
157});
158static MZ_INTERNAL_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
159    id: SchemaId::System(MZ_INTERNAL_SCHEMA_ID),
160    oid: SCHEMA_MZ_INTERNAL_OID,
161    database_id: None,
162    name: "mz_internal".to_string(),
163    owner_id: MZ_SYSTEM_ROLE_ID,
164    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
165});
166static INFORMATION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
167    id: SchemaId::System(INFORMATION_SCHEMA_ID),
168    oid: SCHEMA_INFORMATION_SCHEMA_OID,
169    database_id: None,
170    name: "information_schema".to_string(),
171    owner_id: MZ_SYSTEM_ROLE_ID,
172    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
173});
174static MZ_UNSAFE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
175    id: SchemaId::System(MZ_UNSAFE_SCHEMA_ID),
176    oid: SCHEMA_MZ_UNSAFE_OID,
177    database_id: None,
178    name: "mz_unsafe".to_string(),
179    owner_id: MZ_SYSTEM_ROLE_ID,
180    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
181});
182static MZ_CATALOG_UNSTABLE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
183    id: SchemaId::System(MZ_CATALOG_UNSTABLE_SCHEMA_ID),
184    oid: SCHEMA_MZ_CATALOG_UNSTABLE_OID,
185    database_id: None,
186    name: "mz_catalog_unstable".to_string(),
187    owner_id: MZ_SYSTEM_ROLE_ID,
188    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
189});
190static MZ_INTROSPECTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
191    id: SchemaId::System(MZ_INTROSPECTION_SCHEMA_ID),
192    oid: SCHEMA_MZ_INTROSPECTION_OID,
193    database_id: None,
194    name: "mz_introspection".to_string(),
195    owner_id: MZ_SYSTEM_ROLE_ID,
196    privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
197});
198static SYSTEM_SCHEMAS: LazyLock<BTreeMap<&str, &Schema>> = LazyLock::new(|| {
199    [
200        &*MZ_CATALOG_SCHEMA,
201        &*PG_CATALOG_SCHEMA,
202        &*MZ_INTERNAL_SCHEMA,
203        &*INFORMATION_SCHEMA,
204        &*MZ_UNSAFE_SCHEMA,
205        &*MZ_CATALOG_UNSTABLE_SCHEMA,
206        &*MZ_INTROSPECTION_SCHEMA,
207    ]
208    .into_iter()
209    .map(|s| (&*s.name, s))
210    .collect()
211});
212
213/// Initializes the Catalog with some default objects.
214#[mz_ore::instrument]
215pub(crate) async fn initialize(
216    tx: &mut Transaction<'_>,
217    options: &BootstrapArgs,
218    initial_ts: EpochMillis,
219    catalog_content_version: String,
220) -> Result<(), CatalogError> {
221    // Collect audit events so we can commit them once at the very end.
222    let mut audit_events = vec![];
223
224    for (name, next_id) in [
225        (USER_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
226        (SYSTEM_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
227        (
228            DATABASE_ID_ALLOC_KEY.to_string(),
229            MATERIALIZE_DATABASE_ID_VAL + 1,
230        ),
231        (
232            SCHEMA_ID_ALLOC_KEY.to_string(),
233            max(&[
234                MZ_CATALOG_SCHEMA_ID,
235                PG_CATALOG_SCHEMA_ID,
236                PUBLIC_SCHEMA_ID,
237                MZ_INTERNAL_SCHEMA_ID,
238                INFORMATION_SCHEMA_ID,
239                MZ_UNSAFE_SCHEMA_ID,
240                MZ_CATALOG_UNSTABLE_SCHEMA_ID,
241                MZ_INTROSPECTION_SCHEMA_ID,
242            ])
243            .expect("known to be non-empty")
244                + 1,
245        ),
246        (USER_ROLE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
247        (
248            USER_CLUSTER_ID_ALLOC_KEY.to_string(),
249            DEFAULT_USER_CLUSTER_ID.inner_id() + 1,
250        ),
251        (
252            SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string(),
253            DEFAULT_ALLOCATOR_ID,
254        ),
255        (
256            USER_REPLICA_ID_ALLOC_KEY.to_string(),
257            DEFAULT_USER_REPLICA_ID.inner_id()
258                + u64::from(options.default_cluster_replication_factor),
259        ),
260        (
261            SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(),
262            DEFAULT_ALLOCATOR_ID,
263        ),
264        (
265            USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(),
266            DEFAULT_ALLOCATOR_ID,
267        ),
268        (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
269        (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
270        (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()),
271    ] {
272        tx.insert_id_allocator(name, next_id)?;
273    }
274
275    for role in BUILTIN_ROLES {
276        tx.insert_builtin_role(
277            role.id,
278            role.name.to_string(),
279            role.attributes.clone(),
280            RoleMembership::new(),
281            RoleVars::default(),
282            role.oid,
283        )?;
284    }
285    tx.insert_builtin_role(
286        RoleId::Public,
287        PUBLIC_ROLE_NAME.as_str().to_lowercase(),
288        RoleAttributesRaw::new(),
289        RoleMembership::new(),
290        RoleVars::default(),
291        ROLE_PUBLIC_OID,
292    )?;
293
294    // If provided, generate a new Id for the bootstrap role.
295    let bootstrap_role = if let Some(role) = &options.bootstrap_role {
296        let attributes = RoleAttributesRaw::new();
297        let membership = RoleMembership::new();
298        let vars = RoleVars::default();
299
300        let (id, oid) = tx.insert_user_role(
301            role.to_string(),
302            attributes.clone(),
303            membership.clone(),
304            vars.clone(),
305            &HashSet::new(),
306        )?;
307
308        audit_events.push((
309            mz_audit_log::EventType::Create,
310            mz_audit_log::ObjectType::Role,
311            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
312                id: id.to_string(),
313                name: role.to_string(),
314            }),
315        ));
316
317        Some(Role {
318            id,
319            name: role.to_string(),
320            attributes: attributes.into(),
321            membership,
322            vars,
323            oid,
324        })
325    } else {
326        None
327    };
328
329    let default_privileges = [
330        // mz_support needs USAGE privileges on all clusters, databases, and schemas for
331        // debugging.
332        DefaultPrivilege {
333            object: DefaultPrivilegeObject {
334                role_id: RoleId::Public,
335                database_id: None,
336                schema_id: None,
337                object_type: mz_sql::catalog::ObjectType::Cluster,
338            },
339            acl_item: DefaultPrivilegeAclItem {
340                grantee: MZ_SUPPORT_ROLE_ID,
341                acl_mode: AclMode::USAGE,
342            },
343        },
344        DefaultPrivilege {
345            object: DefaultPrivilegeObject {
346                role_id: RoleId::Public,
347                database_id: None,
348                schema_id: None,
349                object_type: mz_sql::catalog::ObjectType::Database,
350            },
351            acl_item: DefaultPrivilegeAclItem {
352                grantee: MZ_SUPPORT_ROLE_ID,
353                acl_mode: AclMode::USAGE,
354            },
355        },
356        DefaultPrivilege {
357            object: DefaultPrivilegeObject {
358                role_id: RoleId::Public,
359                database_id: None,
360                schema_id: None,
361                object_type: mz_sql::catalog::ObjectType::Schema,
362            },
363            acl_item: DefaultPrivilegeAclItem {
364                grantee: MZ_SUPPORT_ROLE_ID,
365                acl_mode: AclMode::USAGE,
366            },
367        },
368        DefaultPrivilege {
369            object: DefaultPrivilegeObject {
370                role_id: RoleId::Public,
371                database_id: None,
372                schema_id: None,
373                object_type: mz_sql::catalog::ObjectType::Type,
374            },
375            acl_item: DefaultPrivilegeAclItem {
376                grantee: RoleId::Public,
377                acl_mode: AclMode::USAGE,
378            },
379        },
380    ];
381    tx.set_default_privileges(default_privileges.to_vec())?;
382    for DefaultPrivilege { object, acl_item } in default_privileges {
383        let object_type = match object.object_type {
384            ObjectType::Table => mz_audit_log::ObjectType::Table,
385            ObjectType::View => mz_audit_log::ObjectType::View,
386            ObjectType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
387            ObjectType::Source => mz_audit_log::ObjectType::Source,
388            ObjectType::Sink => mz_audit_log::ObjectType::Sink,
389            ObjectType::Index => mz_audit_log::ObjectType::Index,
390            ObjectType::Type => mz_audit_log::ObjectType::Type,
391            ObjectType::Role => mz_audit_log::ObjectType::Role,
392            ObjectType::Cluster => mz_audit_log::ObjectType::Cluster,
393            ObjectType::ClusterReplica => mz_audit_log::ObjectType::ClusterReplica,
394            ObjectType::Secret => mz_audit_log::ObjectType::Secret,
395            ObjectType::Connection => mz_audit_log::ObjectType::Connection,
396            ObjectType::Database => mz_audit_log::ObjectType::Database,
397            ObjectType::Schema => mz_audit_log::ObjectType::Schema,
398            ObjectType::Func => mz_audit_log::ObjectType::Func,
399            ObjectType::NetworkPolicy => mz_audit_log::ObjectType::NetworkPolicy,
400        };
401        audit_events.push((
402            mz_audit_log::EventType::Grant,
403            object_type,
404            mz_audit_log::EventDetails::AlterDefaultPrivilegeV1(
405                mz_audit_log::AlterDefaultPrivilegeV1 {
406                    role_id: object.role_id.to_string(),
407                    database_id: object.database_id.map(|id| id.to_string()),
408                    schema_id: object.schema_id.map(|id| id.to_string()),
409                    grantee_id: acl_item.grantee.to_string(),
410                    privileges: acl_item.acl_mode.to_string(),
411                },
412            ),
413        ));
414    }
415
416    let mut db_privileges = vec![
417        MzAclItem {
418            grantee: RoleId::Public,
419            grantor: MZ_SYSTEM_ROLE_ID,
420            acl_mode: AclMode::USAGE,
421        },
422        MzAclItem {
423            grantee: MZ_SUPPORT_ROLE_ID,
424            grantor: MZ_SYSTEM_ROLE_ID,
425            acl_mode: AclMode::USAGE,
426        },
427        rbac::owner_privilege(mz_sql::catalog::ObjectType::Database, MZ_SYSTEM_ROLE_ID),
428    ];
429    // Optionally add a privilege for the bootstrap role.
430    if let Some(role) = &bootstrap_role {
431        db_privileges.push(MzAclItem {
432            grantee: role.id.clone(),
433            grantor: MZ_SYSTEM_ROLE_ID,
434            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
435                mz_sql::catalog::ObjectType::Database,
436            )),
437        })
438    };
439
440    let materialize_db_oid = tx.allocate_oid(&HashSet::new())?;
441    tx.insert_database(
442        MATERIALIZE_DATABASE_ID,
443        "materialize",
444        MZ_SYSTEM_ROLE_ID,
445        db_privileges,
446        materialize_db_oid,
447    )?;
448    audit_events.extend([
449        (
450            mz_audit_log::EventType::Create,
451            mz_audit_log::ObjectType::Database,
452            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
453                id: MATERIALIZE_DATABASE_ID.to_string(),
454                name: "materialize".to_string(),
455            }),
456        ),
457        (
458            mz_audit_log::EventType::Grant,
459            mz_audit_log::ObjectType::Database,
460            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
461                object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
462                grantee_id: RoleId::Public.to_string(),
463                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
464                privileges: AclMode::USAGE.to_string(),
465            }),
466        ),
467    ]);
468    // Optionally add a privilege for the bootstrap role.
469    if let Some(role) = &bootstrap_role {
470        let role_id: RoleId = role.id.clone();
471        audit_events.push((
472            mz_audit_log::EventType::Grant,
473            mz_audit_log::ObjectType::Database,
474            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
475                object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
476                grantee_id: role_id.to_string(),
477                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
478                privileges: rbac::all_object_privileges(SystemObjectType::Object(
479                    mz_sql::catalog::ObjectType::Database,
480                ))
481                .to_string(),
482            }),
483        ));
484    }
485
486    let public_schema_oid = tx.allocate_oid(&HashSet::new())?;
487    let public_schema = Schema {
488        id: SchemaId::User(PUBLIC_SCHEMA_ID),
489        oid: public_schema_oid,
490        database_id: Some(MATERIALIZE_DATABASE_ID),
491        name: "public".to_string(),
492        owner_id: MZ_SYSTEM_ROLE_ID,
493        privileges: vec![
494            MzAclItem {
495                grantee: RoleId::Public,
496                grantor: MZ_SYSTEM_ROLE_ID,
497                acl_mode: AclMode::USAGE,
498            },
499            MzAclItem {
500                grantee: MZ_SUPPORT_ROLE_ID,
501                grantor: MZ_SYSTEM_ROLE_ID,
502                acl_mode: AclMode::USAGE,
503            },
504            rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
505        ]
506        .into_iter()
507        // Optionally add the bootstrap role to the public schema.
508        .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
509            grantee: role.id.clone(),
510            grantor: MZ_SYSTEM_ROLE_ID,
511            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
512                mz_sql::catalog::ObjectType::Schema,
513            )),
514        }))
515        .collect(),
516    };
517
518    for schema in SYSTEM_SCHEMAS.values().chain(iter::once(&&public_schema)) {
519        tx.insert_schema(
520            schema.id,
521            schema.database_id,
522            schema.name.clone(),
523            schema.owner_id,
524            schema.privileges.clone(),
525            schema.oid,
526        )?;
527    }
528    audit_events.push((
529        mz_audit_log::EventType::Create,
530        mz_audit_log::ObjectType::Schema,
531        mz_audit_log::EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
532            id: PUBLIC_SCHEMA_ID.to_string(),
533            name: "public".to_string(),
534            database_name: Some("materialize".to_string()),
535        }),
536    ));
537    if let Some(role) = &bootstrap_role {
538        let role_id: RoleId = role.id.clone();
539        audit_events.push((
540            mz_audit_log::EventType::Grant,
541            mz_audit_log::ObjectType::Schema,
542            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
543                object_id: ObjectId::Schema((
544                    ResolvedDatabaseSpecifier::Id(MATERIALIZE_DATABASE_ID),
545                    SchemaSpecifier::Id(SchemaId::User(PUBLIC_SCHEMA_ID)),
546                ))
547                .to_string(),
548                grantee_id: role_id.to_string(),
549                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
550                privileges: rbac::all_object_privileges(SystemObjectType::Object(
551                    mz_sql::catalog::ObjectType::Schema,
552                ))
553                .to_string(),
554            }),
555        ));
556    }
557
558    let mut cluster_privileges = vec![
559        MzAclItem {
560            grantee: RoleId::Public,
561            grantor: MZ_SYSTEM_ROLE_ID,
562            acl_mode: AclMode::USAGE,
563        },
564        MzAclItem {
565            grantee: MZ_SUPPORT_ROLE_ID,
566            grantor: MZ_SYSTEM_ROLE_ID,
567            acl_mode: AclMode::USAGE,
568        },
569        rbac::owner_privilege(mz_sql::catalog::ObjectType::Cluster, MZ_SYSTEM_ROLE_ID),
570    ];
571
572    // Optionally add a privilege for the bootstrap role.
573    if let Some(role) = &bootstrap_role {
574        cluster_privileges.push(MzAclItem {
575            grantee: role.id.clone(),
576            grantor: MZ_SYSTEM_ROLE_ID,
577            acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
578                mz_sql::catalog::ObjectType::Cluster,
579            )),
580        });
581    };
582
583    tx.insert_network_policy(
584        DEFAULT_USER_NETWORK_POLICY_ID,
585        DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
586        DEFAULT_USER_NETWORK_POLICY_RULES
587            .into_iter()
588            .map(|(name, action, direction, ip_str)| NetworkPolicyRule {
589                name: name.to_string(),
590                action: action.clone(),
591                direction: direction.clone(),
592                address: PolicyAddress(
593                    IpNet::from_str(ip_str).expect("default policy must provide valid ip"),
594                ),
595            })
596            .collect::<Vec<NetworkPolicyRule>>(),
597        DEFAULT_USER_NETWORK_POLICY_PRIVILEGES.clone(),
598        MZ_SYSTEM_ROLE_ID,
599        NETWORK_POLICIES_DEFAULT_POLICY_OID,
600    )?;
601    // We created a network policy with a prefined ID user(1) and OID. We need
602    // to increment the id alloc key. It should be safe to assume that there's
603    // no user(1), as a sanity check, we'll assert this is the case.
604    let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
605    assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
606
607    audit_events.extend([(
608        mz_audit_log::EventType::Create,
609        mz_audit_log::ObjectType::NetworkPolicy,
610        mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
611            id: DEFAULT_USER_NETWORK_POLICY_ID.to_string(),
612            name: DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
613        }),
614    )]);
615
616    tx.insert_user_cluster(
617        DEFAULT_USER_CLUSTER_ID,
618        DEFAULT_USER_CLUSTER_NAME,
619        Vec::new(),
620        MZ_SYSTEM_ROLE_ID,
621        cluster_privileges,
622        default_cluster_config(options)?,
623        &HashSet::new(),
624    )?;
625    audit_events.extend([
626        (
627            mz_audit_log::EventType::Create,
628            mz_audit_log::ObjectType::Cluster,
629            mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
630                id: DEFAULT_USER_CLUSTER_ID.to_string(),
631                name: DEFAULT_USER_CLUSTER_NAME.to_string(),
632            }),
633        ),
634        (
635            mz_audit_log::EventType::Grant,
636            mz_audit_log::ObjectType::Cluster,
637            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
638                object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
639                grantee_id: RoleId::Public.to_string(),
640                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
641                privileges: AclMode::USAGE.to_string(),
642            }),
643        ),
644    ]);
645
646    // Optionally add a privilege for the bootstrap role.
647    if let Some(role) = &bootstrap_role {
648        let role_id: RoleId = role.id.clone();
649        audit_events.push((
650            mz_audit_log::EventType::Grant,
651            mz_audit_log::ObjectType::Cluster,
652            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
653                object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
654                grantee_id: role_id.to_string(),
655                grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
656                privileges: rbac::all_object_privileges(SystemObjectType::Object(
657                    mz_sql::catalog::ObjectType::Cluster,
658                ))
659                .to_string(),
660            }),
661        ));
662    }
663
664    for i in 0..options.default_cluster_replication_factor {
665        let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i));
666        let replica_name = format!("r{}", i + 1);
667        tx.insert_cluster_replica_with_id(
668            DEFAULT_USER_CLUSTER_ID,
669            replica_id,
670            &replica_name,
671            default_replica_config(options)?,
672            MZ_SYSTEM_ROLE_ID,
673        )?;
674        audit_events.push((
675            mz_audit_log::EventType::Create,
676            mz_audit_log::ObjectType::ClusterReplica,
677            mz_audit_log::EventDetails::CreateClusterReplicaV4(
678                mz_audit_log::CreateClusterReplicaV4 {
679                    cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(),
680                    cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(),
681                    replica_name,
682                    replica_id: Some(replica_id.to_string()),
683                    logical_size: options.default_cluster_replica_size.to_string(),
684                    billed_as: None,
685                    internal: false,
686                    reason: CreateOrDropClusterReplicaReasonV1::System,
687                    scheduling_policies: None,
688                },
689            ),
690        ));
691    }
692
693    let system_privileges = [MzAclItem {
694        grantee: MZ_SYSTEM_ROLE_ID,
695        grantor: MZ_SYSTEM_ROLE_ID,
696        acl_mode: rbac::all_object_privileges(SystemObjectType::System),
697    }]
698    .into_iter()
699    // Optionally add system privileges for the bootstrap role.
700    .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
701        grantee: role.id.clone(),
702        grantor: MZ_SYSTEM_ROLE_ID,
703        acl_mode: rbac::all_object_privileges(SystemObjectType::System),
704    }));
705    tx.set_system_privileges(system_privileges.clone().collect())?;
706    for system_privilege in system_privileges {
707        audit_events.push((
708            mz_audit_log::EventType::Grant,
709            mz_audit_log::ObjectType::System,
710            mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
711                object_id: "SYSTEM".to_string(),
712                grantee_id: system_privilege.grantee.to_string(),
713                grantor_id: system_privilege.grantor.to_string(),
714                privileges: system_privilege.acl_mode.to_string(),
715            }),
716        ));
717    }
718
719    // Allocate an ID for each audit log event.
720    let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
721    for (ty, obj, details) in audit_events {
722        let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
723        audit_events_with_id.push((id, ty, obj, details));
724    }
725
726    for (id, event_type, object_type, details) in audit_events_with_id {
727        tx.insert_audit_log_event(VersionedEvent::V1(EventV1 {
728            id,
729            event_type,
730            object_type,
731            details,
732            user: None,
733            occurred_at: initial_ts,
734        }));
735    }
736
737    for (key, value) in [
738        (USER_VERSION_KEY.to_string(), CATALOG_VERSION),
739        (SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
740    ] {
741        tx.insert_config(key, value)?;
742    }
743
744    for (name, value) in [
745        (
746            CATALOG_CONTENT_VERSION_KEY.to_string(),
747            catalog_content_version,
748        ),
749        (
750            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
751            ShardId::new().to_string(),
752        ),
753        (
754            EXPRESSION_CACHE_SHARD_KEY.to_string(),
755            ShardId::new().to_string(),
756        ),
757    ] {
758        tx.set_setting(name, Some(value))?;
759    }
760
761    if tx
762        .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
763        .is_none()
764    {
765        let mut nonce = [0u8; 24];
766        openssl::rand::rand_bytes(&mut nonce).expect("random number generation failed");
767        tx.set_setting(
768            MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
769            Some(BASE64_STANDARD.encode(nonce)),
770        )?;
771    }
772
773    Ok(())
774}
775
776pub fn resolve_system_schema(name: &str) -> &Schema {
777    SYSTEM_SCHEMAS
778        .get(name)
779        .unwrap_or_else(|| panic!("unable to resolve system schema: {name}"))
780}
781
782/// Defines the default config for a Cluster.
783fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
784    Ok(ClusterConfig {
785        variant: ClusterVariant::Managed(ClusterVariantManaged {
786            size: args.default_cluster_replica_size.to_string(),
787            replication_factor: args.default_cluster_replication_factor,
788            availability_zones: vec![],
789            logging: ReplicaLogging {
790                log_logging: false,
791                interval: Some(Duration::from_secs(1)),
792            },
793            optimizer_feature_overrides: Default::default(),
794            schedule: Default::default(),
795        }),
796        workload_class: None,
797    })
798}
799
800/// Defines the default config for a Cluster Replica.
801fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
802    Ok(ReplicaConfig {
803        location: ReplicaLocation::Managed {
804            size: args.default_cluster_replica_size.to_string(),
805            availability_zone: None,
806            internal: false,
807            billed_as: None,
808            pending: false,
809        },
810        logging: ReplicaLogging {
811            log_logging: false,
812            interval: Some(Duration::from_secs(1)),
813        },
814    })
815}