1use std::collections::BTreeMap;
11use std::iter;
12use std::str::FromStr;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use base64::prelude::*;
17use ipnet::IpNet;
18use itertools::max;
19use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
20use mz_controller::clusters::ReplicaLogging;
21use mz_controller_types::{ClusterId, ReplicaId};
22use mz_ore::collections::HashSet;
23use mz_ore::now::EpochMillis;
24use mz_persist_types::ShardId;
25use mz_pgrepr::oid::{
26 FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
27 SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
28 SCHEMA_MZ_INTERNAL_OID, SCHEMA_MZ_INTROSPECTION_OID, SCHEMA_MZ_UNSAFE_OID,
29 SCHEMA_PG_CATALOG_OID,
30};
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
32use mz_repr::network_policy_id::NetworkPolicyId;
33use mz_repr::role_id::RoleId;
34use mz_sql::catalog::{
35 DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributesRaw, RoleMembership,
36 RoleVars, SystemObjectType,
37};
38use mz_sql::names::{
39 DatabaseId, ObjectId, PUBLIC_ROLE_NAME, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
40};
41use mz_sql::plan::{NetworkPolicyRule, PolicyAddress};
42use mz_sql::rbac;
43use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
44
45use crate::builtin::BUILTIN_ROLES;
46use crate::durable::upgrade::CATALOG_VERSION;
47use crate::durable::{
48 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, BootstrapArgs,
49 CATALOG_CONTENT_VERSION_KEY, CatalogError, ClusterConfig, ClusterVariant,
50 ClusterVariantManaged, DATABASE_ID_ALLOC_KEY, DefaultPrivilege, EXPRESSION_CACHE_SHARD_KEY,
51 MOCK_AUTHENTICATION_NONCE_KEY, OID_ALLOC_KEY, ReplicaConfig, ReplicaLocation, Role,
52 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
53 SYSTEM_REPLICA_ID_ALLOC_KEY, Schema, Transaction, USER_CLUSTER_ID_ALLOC_KEY,
54 USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
55};
56
57pub const USER_VERSION_KEY: &str = "user_version";
59pub(crate) const SYSTEM_CONFIG_SYNCED_KEY: &str = "system_config_synced";
62
63pub(crate) const WITH_0DT_DEPLOYMENT_MAX_WAIT: &str = "with_0dt_deployment_max_wait";
70
71pub(crate) const WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL: &str =
78 "with_0dt_deployment_ddl_check_interval";
79
80pub(crate) const ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT: &str =
85 "enable_0dt_deployment_panic_after_timeout";
86
87const USER_ID_ALLOC_KEY: &str = "user";
88const SYSTEM_ID_ALLOC_KEY: &str = "system";
89
90const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1);
91const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart";
92
93const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1);
94
95const MATERIALIZE_DATABASE_ID_VAL: u64 = 1;
96const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL);
97
98const MZ_CATALOG_SCHEMA_ID: u64 = 1;
99const PG_CATALOG_SCHEMA_ID: u64 = 2;
100const PUBLIC_SCHEMA_ID: u64 = 3;
101const MZ_INTERNAL_SCHEMA_ID: u64 = 4;
102const INFORMATION_SCHEMA_ID: u64 = 5;
103pub const MZ_UNSAFE_SCHEMA_ID: u64 = 6;
104pub const MZ_CATALOG_UNSTABLE_SCHEMA_ID: u64 = 7;
105pub const MZ_INTROSPECTION_SCHEMA_ID: u64 = 8;
106
107const DEFAULT_ALLOCATOR_ID: u64 = 1;
108
109pub const DEFAULT_USER_NETWORK_POLICY_ID: NetworkPolicyId = NetworkPolicyId::User(1);
110pub const DEFAULT_USER_NETWORK_POLICY_NAME: &str = "default";
111pub const DEFAULT_USER_NETWORK_POLICY_RULES: &[(
112 &str,
113 mz_sql::plan::NetworkPolicyRuleAction,
114 mz_sql::plan::NetworkPolicyRuleDirection,
115 &str,
116)] = &[(
117 "open_ingress",
118 mz_sql::plan::NetworkPolicyRuleAction::Allow,
119 mz_sql::plan::NetworkPolicyRuleDirection::Ingress,
120 "0.0.0.0/0",
121)];
122
123static DEFAULT_USER_NETWORK_POLICY_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
124 vec![rbac::owner_privilege(
125 ObjectType::NetworkPolicy,
126 MZ_SYSTEM_ROLE_ID,
127 )]
128});
129
130static SYSTEM_SCHEMA_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
131 vec![
132 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Schema),
133 MzAclItem {
134 grantee: MZ_SUPPORT_ROLE_ID,
135 grantor: MZ_SYSTEM_ROLE_ID,
136 acl_mode: AclMode::USAGE,
137 },
138 rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
139 ]
140});
141
142static MZ_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
143 id: SchemaId::System(MZ_CATALOG_SCHEMA_ID),
144 oid: SCHEMA_MZ_CATALOG_OID,
145 database_id: None,
146 name: "mz_catalog".to_string(),
147 owner_id: MZ_SYSTEM_ROLE_ID,
148 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
149});
150static PG_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
151 id: SchemaId::System(PG_CATALOG_SCHEMA_ID),
152 oid: SCHEMA_PG_CATALOG_OID,
153 database_id: None,
154 name: "pg_catalog".to_string(),
155 owner_id: MZ_SYSTEM_ROLE_ID,
156 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
157});
158static MZ_INTERNAL_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
159 id: SchemaId::System(MZ_INTERNAL_SCHEMA_ID),
160 oid: SCHEMA_MZ_INTERNAL_OID,
161 database_id: None,
162 name: "mz_internal".to_string(),
163 owner_id: MZ_SYSTEM_ROLE_ID,
164 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
165});
166static INFORMATION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
167 id: SchemaId::System(INFORMATION_SCHEMA_ID),
168 oid: SCHEMA_INFORMATION_SCHEMA_OID,
169 database_id: None,
170 name: "information_schema".to_string(),
171 owner_id: MZ_SYSTEM_ROLE_ID,
172 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
173});
174static MZ_UNSAFE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
175 id: SchemaId::System(MZ_UNSAFE_SCHEMA_ID),
176 oid: SCHEMA_MZ_UNSAFE_OID,
177 database_id: None,
178 name: "mz_unsafe".to_string(),
179 owner_id: MZ_SYSTEM_ROLE_ID,
180 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
181});
182static MZ_CATALOG_UNSTABLE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
183 id: SchemaId::System(MZ_CATALOG_UNSTABLE_SCHEMA_ID),
184 oid: SCHEMA_MZ_CATALOG_UNSTABLE_OID,
185 database_id: None,
186 name: "mz_catalog_unstable".to_string(),
187 owner_id: MZ_SYSTEM_ROLE_ID,
188 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
189});
190static MZ_INTROSPECTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
191 id: SchemaId::System(MZ_INTROSPECTION_SCHEMA_ID),
192 oid: SCHEMA_MZ_INTROSPECTION_OID,
193 database_id: None,
194 name: "mz_introspection".to_string(),
195 owner_id: MZ_SYSTEM_ROLE_ID,
196 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
197});
198static SYSTEM_SCHEMAS: LazyLock<BTreeMap<&str, &Schema>> = LazyLock::new(|| {
199 [
200 &*MZ_CATALOG_SCHEMA,
201 &*PG_CATALOG_SCHEMA,
202 &*MZ_INTERNAL_SCHEMA,
203 &*INFORMATION_SCHEMA,
204 &*MZ_UNSAFE_SCHEMA,
205 &*MZ_CATALOG_UNSTABLE_SCHEMA,
206 &*MZ_INTROSPECTION_SCHEMA,
207 ]
208 .into_iter()
209 .map(|s| (&*s.name, s))
210 .collect()
211});
212
213#[mz_ore::instrument]
215pub(crate) async fn initialize(
216 tx: &mut Transaction<'_>,
217 options: &BootstrapArgs,
218 initial_ts: EpochMillis,
219 catalog_content_version: String,
220) -> Result<(), CatalogError> {
221 let mut audit_events = vec![];
223
224 for (name, next_id) in [
225 (USER_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
226 (SYSTEM_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
227 (
228 DATABASE_ID_ALLOC_KEY.to_string(),
229 MATERIALIZE_DATABASE_ID_VAL + 1,
230 ),
231 (
232 SCHEMA_ID_ALLOC_KEY.to_string(),
233 max(&[
234 MZ_CATALOG_SCHEMA_ID,
235 PG_CATALOG_SCHEMA_ID,
236 PUBLIC_SCHEMA_ID,
237 MZ_INTERNAL_SCHEMA_ID,
238 INFORMATION_SCHEMA_ID,
239 MZ_UNSAFE_SCHEMA_ID,
240 MZ_CATALOG_UNSTABLE_SCHEMA_ID,
241 MZ_INTROSPECTION_SCHEMA_ID,
242 ])
243 .expect("known to be non-empty")
244 + 1,
245 ),
246 (USER_ROLE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
247 (
248 USER_CLUSTER_ID_ALLOC_KEY.to_string(),
249 DEFAULT_USER_CLUSTER_ID.inner_id() + 1,
250 ),
251 (
252 SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string(),
253 DEFAULT_ALLOCATOR_ID,
254 ),
255 (
256 USER_REPLICA_ID_ALLOC_KEY.to_string(),
257 DEFAULT_USER_REPLICA_ID.inner_id()
258 + u64::from(options.default_cluster_replication_factor),
259 ),
260 (
261 SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(),
262 DEFAULT_ALLOCATOR_ID,
263 ),
264 (
265 USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(),
266 DEFAULT_ALLOCATOR_ID,
267 ),
268 (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
269 (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
270 (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()),
271 ] {
272 tx.insert_id_allocator(name, next_id)?;
273 }
274
275 for role in BUILTIN_ROLES {
276 tx.insert_builtin_role(
277 role.id,
278 role.name.to_string(),
279 role.attributes.clone(),
280 RoleMembership::new(),
281 RoleVars::default(),
282 role.oid,
283 )?;
284 }
285 tx.insert_builtin_role(
286 RoleId::Public,
287 PUBLIC_ROLE_NAME.as_str().to_lowercase(),
288 RoleAttributesRaw::new(),
289 RoleMembership::new(),
290 RoleVars::default(),
291 ROLE_PUBLIC_OID,
292 )?;
293
294 let bootstrap_role = if let Some(role) = &options.bootstrap_role {
296 let attributes = RoleAttributesRaw::new();
297 let membership = RoleMembership::new();
298 let vars = RoleVars::default();
299
300 let (id, oid) = tx.insert_user_role(
301 role.to_string(),
302 attributes.clone(),
303 membership.clone(),
304 vars.clone(),
305 &HashSet::new(),
306 )?;
307
308 audit_events.push((
309 mz_audit_log::EventType::Create,
310 mz_audit_log::ObjectType::Role,
311 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
312 id: id.to_string(),
313 name: role.to_string(),
314 }),
315 ));
316
317 Some(Role {
318 id,
319 name: role.to_string(),
320 attributes: attributes.into(),
321 membership,
322 vars,
323 oid,
324 })
325 } else {
326 None
327 };
328
329 let default_privileges = [
330 DefaultPrivilege {
333 object: DefaultPrivilegeObject {
334 role_id: RoleId::Public,
335 database_id: None,
336 schema_id: None,
337 object_type: mz_sql::catalog::ObjectType::Cluster,
338 },
339 acl_item: DefaultPrivilegeAclItem {
340 grantee: MZ_SUPPORT_ROLE_ID,
341 acl_mode: AclMode::USAGE,
342 },
343 },
344 DefaultPrivilege {
345 object: DefaultPrivilegeObject {
346 role_id: RoleId::Public,
347 database_id: None,
348 schema_id: None,
349 object_type: mz_sql::catalog::ObjectType::Database,
350 },
351 acl_item: DefaultPrivilegeAclItem {
352 grantee: MZ_SUPPORT_ROLE_ID,
353 acl_mode: AclMode::USAGE,
354 },
355 },
356 DefaultPrivilege {
357 object: DefaultPrivilegeObject {
358 role_id: RoleId::Public,
359 database_id: None,
360 schema_id: None,
361 object_type: mz_sql::catalog::ObjectType::Schema,
362 },
363 acl_item: DefaultPrivilegeAclItem {
364 grantee: MZ_SUPPORT_ROLE_ID,
365 acl_mode: AclMode::USAGE,
366 },
367 },
368 DefaultPrivilege {
369 object: DefaultPrivilegeObject {
370 role_id: RoleId::Public,
371 database_id: None,
372 schema_id: None,
373 object_type: mz_sql::catalog::ObjectType::Type,
374 },
375 acl_item: DefaultPrivilegeAclItem {
376 grantee: RoleId::Public,
377 acl_mode: AclMode::USAGE,
378 },
379 },
380 ];
381 tx.set_default_privileges(default_privileges.to_vec())?;
382 for DefaultPrivilege { object, acl_item } in default_privileges {
383 let object_type = match object.object_type {
384 ObjectType::Table => mz_audit_log::ObjectType::Table,
385 ObjectType::View => mz_audit_log::ObjectType::View,
386 ObjectType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
387 ObjectType::Source => mz_audit_log::ObjectType::Source,
388 ObjectType::Sink => mz_audit_log::ObjectType::Sink,
389 ObjectType::Index => mz_audit_log::ObjectType::Index,
390 ObjectType::Type => mz_audit_log::ObjectType::Type,
391 ObjectType::Role => mz_audit_log::ObjectType::Role,
392 ObjectType::Cluster => mz_audit_log::ObjectType::Cluster,
393 ObjectType::ClusterReplica => mz_audit_log::ObjectType::ClusterReplica,
394 ObjectType::Secret => mz_audit_log::ObjectType::Secret,
395 ObjectType::Connection => mz_audit_log::ObjectType::Connection,
396 ObjectType::Database => mz_audit_log::ObjectType::Database,
397 ObjectType::Schema => mz_audit_log::ObjectType::Schema,
398 ObjectType::Func => mz_audit_log::ObjectType::Func,
399 ObjectType::NetworkPolicy => mz_audit_log::ObjectType::NetworkPolicy,
400 };
401 audit_events.push((
402 mz_audit_log::EventType::Grant,
403 object_type,
404 mz_audit_log::EventDetails::AlterDefaultPrivilegeV1(
405 mz_audit_log::AlterDefaultPrivilegeV1 {
406 role_id: object.role_id.to_string(),
407 database_id: object.database_id.map(|id| id.to_string()),
408 schema_id: object.schema_id.map(|id| id.to_string()),
409 grantee_id: acl_item.grantee.to_string(),
410 privileges: acl_item.acl_mode.to_string(),
411 },
412 ),
413 ));
414 }
415
416 let mut db_privileges = vec![
417 MzAclItem {
418 grantee: RoleId::Public,
419 grantor: MZ_SYSTEM_ROLE_ID,
420 acl_mode: AclMode::USAGE,
421 },
422 MzAclItem {
423 grantee: MZ_SUPPORT_ROLE_ID,
424 grantor: MZ_SYSTEM_ROLE_ID,
425 acl_mode: AclMode::USAGE,
426 },
427 rbac::owner_privilege(mz_sql::catalog::ObjectType::Database, MZ_SYSTEM_ROLE_ID),
428 ];
429 if let Some(role) = &bootstrap_role {
431 db_privileges.push(MzAclItem {
432 grantee: role.id.clone(),
433 grantor: MZ_SYSTEM_ROLE_ID,
434 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
435 mz_sql::catalog::ObjectType::Database,
436 )),
437 })
438 };
439
440 let materialize_db_oid = tx.allocate_oid(&HashSet::new())?;
441 tx.insert_database(
442 MATERIALIZE_DATABASE_ID,
443 "materialize",
444 MZ_SYSTEM_ROLE_ID,
445 db_privileges,
446 materialize_db_oid,
447 )?;
448 audit_events.extend([
449 (
450 mz_audit_log::EventType::Create,
451 mz_audit_log::ObjectType::Database,
452 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
453 id: MATERIALIZE_DATABASE_ID.to_string(),
454 name: "materialize".to_string(),
455 }),
456 ),
457 (
458 mz_audit_log::EventType::Grant,
459 mz_audit_log::ObjectType::Database,
460 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
461 object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
462 grantee_id: RoleId::Public.to_string(),
463 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
464 privileges: AclMode::USAGE.to_string(),
465 }),
466 ),
467 ]);
468 if let Some(role) = &bootstrap_role {
470 let role_id: RoleId = role.id.clone();
471 audit_events.push((
472 mz_audit_log::EventType::Grant,
473 mz_audit_log::ObjectType::Database,
474 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
475 object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
476 grantee_id: role_id.to_string(),
477 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
478 privileges: rbac::all_object_privileges(SystemObjectType::Object(
479 mz_sql::catalog::ObjectType::Database,
480 ))
481 .to_string(),
482 }),
483 ));
484 }
485
486 let public_schema_oid = tx.allocate_oid(&HashSet::new())?;
487 let public_schema = Schema {
488 id: SchemaId::User(PUBLIC_SCHEMA_ID),
489 oid: public_schema_oid,
490 database_id: Some(MATERIALIZE_DATABASE_ID),
491 name: "public".to_string(),
492 owner_id: MZ_SYSTEM_ROLE_ID,
493 privileges: vec![
494 MzAclItem {
495 grantee: RoleId::Public,
496 grantor: MZ_SYSTEM_ROLE_ID,
497 acl_mode: AclMode::USAGE,
498 },
499 MzAclItem {
500 grantee: MZ_SUPPORT_ROLE_ID,
501 grantor: MZ_SYSTEM_ROLE_ID,
502 acl_mode: AclMode::USAGE,
503 },
504 rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
505 ]
506 .into_iter()
507 .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
509 grantee: role.id.clone(),
510 grantor: MZ_SYSTEM_ROLE_ID,
511 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
512 mz_sql::catalog::ObjectType::Schema,
513 )),
514 }))
515 .collect(),
516 };
517
518 for schema in SYSTEM_SCHEMAS.values().chain(iter::once(&&public_schema)) {
519 tx.insert_schema(
520 schema.id,
521 schema.database_id,
522 schema.name.clone(),
523 schema.owner_id,
524 schema.privileges.clone(),
525 schema.oid,
526 )?;
527 }
528 audit_events.push((
529 mz_audit_log::EventType::Create,
530 mz_audit_log::ObjectType::Schema,
531 mz_audit_log::EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
532 id: PUBLIC_SCHEMA_ID.to_string(),
533 name: "public".to_string(),
534 database_name: Some("materialize".to_string()),
535 }),
536 ));
537 if let Some(role) = &bootstrap_role {
538 let role_id: RoleId = role.id.clone();
539 audit_events.push((
540 mz_audit_log::EventType::Grant,
541 mz_audit_log::ObjectType::Schema,
542 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
543 object_id: ObjectId::Schema((
544 ResolvedDatabaseSpecifier::Id(MATERIALIZE_DATABASE_ID),
545 SchemaSpecifier::Id(SchemaId::User(PUBLIC_SCHEMA_ID)),
546 ))
547 .to_string(),
548 grantee_id: role_id.to_string(),
549 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
550 privileges: rbac::all_object_privileges(SystemObjectType::Object(
551 mz_sql::catalog::ObjectType::Schema,
552 ))
553 .to_string(),
554 }),
555 ));
556 }
557
558 let mut cluster_privileges = vec![
559 MzAclItem {
560 grantee: RoleId::Public,
561 grantor: MZ_SYSTEM_ROLE_ID,
562 acl_mode: AclMode::USAGE,
563 },
564 MzAclItem {
565 grantee: MZ_SUPPORT_ROLE_ID,
566 grantor: MZ_SYSTEM_ROLE_ID,
567 acl_mode: AclMode::USAGE,
568 },
569 rbac::owner_privilege(mz_sql::catalog::ObjectType::Cluster, MZ_SYSTEM_ROLE_ID),
570 ];
571
572 if let Some(role) = &bootstrap_role {
574 cluster_privileges.push(MzAclItem {
575 grantee: role.id.clone(),
576 grantor: MZ_SYSTEM_ROLE_ID,
577 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
578 mz_sql::catalog::ObjectType::Cluster,
579 )),
580 });
581 };
582
583 tx.insert_network_policy(
584 DEFAULT_USER_NETWORK_POLICY_ID,
585 DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
586 DEFAULT_USER_NETWORK_POLICY_RULES
587 .into_iter()
588 .map(|(name, action, direction, ip_str)| NetworkPolicyRule {
589 name: name.to_string(),
590 action: action.clone(),
591 direction: direction.clone(),
592 address: PolicyAddress(
593 IpNet::from_str(ip_str).expect("default policy must provide valid ip"),
594 ),
595 })
596 .collect::<Vec<NetworkPolicyRule>>(),
597 DEFAULT_USER_NETWORK_POLICY_PRIVILEGES.clone(),
598 MZ_SYSTEM_ROLE_ID,
599 NETWORK_POLICIES_DEFAULT_POLICY_OID,
600 )?;
601 let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
605 assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
606
607 audit_events.extend([(
608 mz_audit_log::EventType::Create,
609 mz_audit_log::ObjectType::NetworkPolicy,
610 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
611 id: DEFAULT_USER_NETWORK_POLICY_ID.to_string(),
612 name: DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
613 }),
614 )]);
615
616 tx.insert_user_cluster(
617 DEFAULT_USER_CLUSTER_ID,
618 DEFAULT_USER_CLUSTER_NAME,
619 Vec::new(),
620 MZ_SYSTEM_ROLE_ID,
621 cluster_privileges,
622 default_cluster_config(options)?,
623 &HashSet::new(),
624 )?;
625 audit_events.extend([
626 (
627 mz_audit_log::EventType::Create,
628 mz_audit_log::ObjectType::Cluster,
629 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
630 id: DEFAULT_USER_CLUSTER_ID.to_string(),
631 name: DEFAULT_USER_CLUSTER_NAME.to_string(),
632 }),
633 ),
634 (
635 mz_audit_log::EventType::Grant,
636 mz_audit_log::ObjectType::Cluster,
637 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
638 object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
639 grantee_id: RoleId::Public.to_string(),
640 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
641 privileges: AclMode::USAGE.to_string(),
642 }),
643 ),
644 ]);
645
646 if let Some(role) = &bootstrap_role {
648 let role_id: RoleId = role.id.clone();
649 audit_events.push((
650 mz_audit_log::EventType::Grant,
651 mz_audit_log::ObjectType::Cluster,
652 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
653 object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
654 grantee_id: role_id.to_string(),
655 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
656 privileges: rbac::all_object_privileges(SystemObjectType::Object(
657 mz_sql::catalog::ObjectType::Cluster,
658 ))
659 .to_string(),
660 }),
661 ));
662 }
663
664 for i in 0..options.default_cluster_replication_factor {
665 let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i));
666 let replica_name = format!("r{}", i + 1);
667 tx.insert_cluster_replica_with_id(
668 DEFAULT_USER_CLUSTER_ID,
669 replica_id,
670 &replica_name,
671 default_replica_config(options)?,
672 MZ_SYSTEM_ROLE_ID,
673 )?;
674 audit_events.push((
675 mz_audit_log::EventType::Create,
676 mz_audit_log::ObjectType::ClusterReplica,
677 mz_audit_log::EventDetails::CreateClusterReplicaV4(
678 mz_audit_log::CreateClusterReplicaV4 {
679 cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(),
680 cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(),
681 replica_name,
682 replica_id: Some(replica_id.to_string()),
683 logical_size: options.default_cluster_replica_size.to_string(),
684 billed_as: None,
685 internal: false,
686 reason: CreateOrDropClusterReplicaReasonV1::System,
687 scheduling_policies: None,
688 },
689 ),
690 ));
691 }
692
693 let system_privileges = [MzAclItem {
694 grantee: MZ_SYSTEM_ROLE_ID,
695 grantor: MZ_SYSTEM_ROLE_ID,
696 acl_mode: rbac::all_object_privileges(SystemObjectType::System),
697 }]
698 .into_iter()
699 .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
701 grantee: role.id.clone(),
702 grantor: MZ_SYSTEM_ROLE_ID,
703 acl_mode: rbac::all_object_privileges(SystemObjectType::System),
704 }));
705 tx.set_system_privileges(system_privileges.clone().collect())?;
706 for system_privilege in system_privileges {
707 audit_events.push((
708 mz_audit_log::EventType::Grant,
709 mz_audit_log::ObjectType::System,
710 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
711 object_id: "SYSTEM".to_string(),
712 grantee_id: system_privilege.grantee.to_string(),
713 grantor_id: system_privilege.grantor.to_string(),
714 privileges: system_privilege.acl_mode.to_string(),
715 }),
716 ));
717 }
718
719 let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
721 for (ty, obj, details) in audit_events {
722 let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
723 audit_events_with_id.push((id, ty, obj, details));
724 }
725
726 for (id, event_type, object_type, details) in audit_events_with_id {
727 tx.insert_audit_log_event(VersionedEvent::V1(EventV1 {
728 id,
729 event_type,
730 object_type,
731 details,
732 user: None,
733 occurred_at: initial_ts,
734 }));
735 }
736
737 for (key, value) in [
738 (USER_VERSION_KEY.to_string(), CATALOG_VERSION),
739 (SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
740 ] {
741 tx.insert_config(key, value)?;
742 }
743
744 for (name, value) in [
745 (
746 CATALOG_CONTENT_VERSION_KEY.to_string(),
747 catalog_content_version,
748 ),
749 (
750 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
751 ShardId::new().to_string(),
752 ),
753 (
754 EXPRESSION_CACHE_SHARD_KEY.to_string(),
755 ShardId::new().to_string(),
756 ),
757 ] {
758 tx.set_setting(name, Some(value))?;
759 }
760
761 if tx
762 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
763 .is_none()
764 {
765 let mut nonce = [0u8; 24];
766 openssl::rand::rand_bytes(&mut nonce).expect("random number generation failed");
767 tx.set_setting(
768 MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
769 Some(BASE64_STANDARD.encode(nonce)),
770 )?;
771 }
772
773 Ok(())
774}
775
776pub fn resolve_system_schema(name: &str) -> &Schema {
777 SYSTEM_SCHEMAS
778 .get(name)
779 .unwrap_or_else(|| panic!("unable to resolve system schema: {name}"))
780}
781
782fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
784 Ok(ClusterConfig {
785 variant: ClusterVariant::Managed(ClusterVariantManaged {
786 size: args.default_cluster_replica_size.to_string(),
787 replication_factor: args.default_cluster_replication_factor,
788 availability_zones: vec![],
789 logging: ReplicaLogging {
790 log_logging: false,
791 interval: Some(Duration::from_secs(1)),
792 },
793 optimizer_feature_overrides: Default::default(),
794 schedule: Default::default(),
795 }),
796 workload_class: None,
797 })
798}
799
800fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
802 Ok(ReplicaConfig {
803 location: ReplicaLocation::Managed {
804 size: args.default_cluster_replica_size.to_string(),
805 availability_zone: None,
806 internal: false,
807 billed_as: None,
808 pending: false,
809 },
810 logging: ReplicaLogging {
811 log_logging: false,
812 interval: Some(Duration::from_secs(1)),
813 },
814 })
815}