1use std::collections::BTreeMap;
11use std::iter;
12use std::str::FromStr;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use ipnet::IpNet;
17use itertools::max;
18use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
19use mz_controller::clusters::ReplicaLogging;
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::collections::HashSet;
22use mz_ore::now::EpochMillis;
23use mz_persist_types::ShardId;
24use mz_pgrepr::oid::{
25 FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
26 SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
27 SCHEMA_MZ_INTERNAL_OID, SCHEMA_MZ_INTROSPECTION_OID, SCHEMA_MZ_UNSAFE_OID,
28 SCHEMA_PG_CATALOG_OID,
29};
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::role_id::RoleId;
33use mz_sql::catalog::{
34 DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributes, RoleMembership,
35 RoleVars, SystemObjectType,
36};
37use mz_sql::names::{
38 DatabaseId, ObjectId, PUBLIC_ROLE_NAME, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
39};
40use mz_sql::plan::{NetworkPolicyRule, PolicyAddress};
41use mz_sql::rbac;
42use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
43
44use crate::builtin::BUILTIN_ROLES;
45use crate::durable::upgrade::CATALOG_VERSION;
46use crate::durable::{
47 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, BootstrapArgs,
48 CATALOG_CONTENT_VERSION_KEY, CatalogError, ClusterConfig, ClusterVariant,
49 ClusterVariantManaged, DATABASE_ID_ALLOC_KEY, DefaultPrivilege, EXPRESSION_CACHE_SHARD_KEY,
50 OID_ALLOC_KEY, ReplicaConfig, ReplicaLocation, Role, SCHEMA_ID_ALLOC_KEY,
51 STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Schema,
52 Transaction, USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY,
53 USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
54};
55
56pub const USER_VERSION_KEY: &str = "user_version";
58pub(crate) const SYSTEM_CONFIG_SYNCED_KEY: &str = "system_config_synced";
61
62pub(crate) const ENABLE_0DT_DEPLOYMENT: &str = "enable_0dt_deployment";
67
68pub(crate) const WITH_0DT_DEPLOYMENT_MAX_WAIT: &str = "with_0dt_deployment_max_wait";
75
76pub(crate) const WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL: &str =
83 "with_0dt_deployment_ddl_check_interval";
84
85pub(crate) const ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT: &str =
90 "enable_0dt_deployment_panic_after_timeout";
91
92const USER_ID_ALLOC_KEY: &str = "user";
93const SYSTEM_ID_ALLOC_KEY: &str = "system";
94
95const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1);
96const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart";
97
98const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1);
99
100const MATERIALIZE_DATABASE_ID_VAL: u64 = 1;
101const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL);
102
103const MZ_CATALOG_SCHEMA_ID: u64 = 1;
104const PG_CATALOG_SCHEMA_ID: u64 = 2;
105const PUBLIC_SCHEMA_ID: u64 = 3;
106const MZ_INTERNAL_SCHEMA_ID: u64 = 4;
107const INFORMATION_SCHEMA_ID: u64 = 5;
108pub const MZ_UNSAFE_SCHEMA_ID: u64 = 6;
109pub const MZ_CATALOG_UNSTABLE_SCHEMA_ID: u64 = 7;
110pub const MZ_INTROSPECTION_SCHEMA_ID: u64 = 8;
111
112const DEFAULT_ALLOCATOR_ID: u64 = 1;
113
114pub const DEFAULT_USER_NETWORK_POLICY_ID: NetworkPolicyId = NetworkPolicyId::User(1);
115pub const DEFAULT_USER_NETWORK_POLICY_NAME: &str = "default";
116pub const DEFAULT_USER_NETWORK_POLICY_RULES: &[(
117 &str,
118 mz_sql::plan::NetworkPolicyRuleAction,
119 mz_sql::plan::NetworkPolicyRuleDirection,
120 &str,
121)] = &[(
122 "open_ingress",
123 mz_sql::plan::NetworkPolicyRuleAction::Allow,
124 mz_sql::plan::NetworkPolicyRuleDirection::Ingress,
125 "0.0.0.0/0",
126)];
127
128static DEFAULT_USER_NETWORK_POLICY_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
129 vec![rbac::owner_privilege(
130 ObjectType::NetworkPolicy,
131 MZ_SYSTEM_ROLE_ID,
132 )]
133});
134
135static SYSTEM_SCHEMA_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
136 vec![
137 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Schema),
138 MzAclItem {
139 grantee: MZ_SUPPORT_ROLE_ID,
140 grantor: MZ_SYSTEM_ROLE_ID,
141 acl_mode: AclMode::USAGE,
142 },
143 rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
144 ]
145});
146
147static MZ_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
148 id: SchemaId::System(MZ_CATALOG_SCHEMA_ID),
149 oid: SCHEMA_MZ_CATALOG_OID,
150 database_id: None,
151 name: "mz_catalog".to_string(),
152 owner_id: MZ_SYSTEM_ROLE_ID,
153 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
154});
155static PG_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
156 id: SchemaId::System(PG_CATALOG_SCHEMA_ID),
157 oid: SCHEMA_PG_CATALOG_OID,
158 database_id: None,
159 name: "pg_catalog".to_string(),
160 owner_id: MZ_SYSTEM_ROLE_ID,
161 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
162});
163static MZ_INTERNAL_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
164 id: SchemaId::System(MZ_INTERNAL_SCHEMA_ID),
165 oid: SCHEMA_MZ_INTERNAL_OID,
166 database_id: None,
167 name: "mz_internal".to_string(),
168 owner_id: MZ_SYSTEM_ROLE_ID,
169 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
170});
171static INFORMATION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
172 id: SchemaId::System(INFORMATION_SCHEMA_ID),
173 oid: SCHEMA_INFORMATION_SCHEMA_OID,
174 database_id: None,
175 name: "information_schema".to_string(),
176 owner_id: MZ_SYSTEM_ROLE_ID,
177 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
178});
179static MZ_UNSAFE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
180 id: SchemaId::System(MZ_UNSAFE_SCHEMA_ID),
181 oid: SCHEMA_MZ_UNSAFE_OID,
182 database_id: None,
183 name: "mz_unsafe".to_string(),
184 owner_id: MZ_SYSTEM_ROLE_ID,
185 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
186});
187static MZ_CATALOG_UNSTABLE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
188 id: SchemaId::System(MZ_CATALOG_UNSTABLE_SCHEMA_ID),
189 oid: SCHEMA_MZ_CATALOG_UNSTABLE_OID,
190 database_id: None,
191 name: "mz_catalog_unstable".to_string(),
192 owner_id: MZ_SYSTEM_ROLE_ID,
193 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
194});
195static MZ_INTROSPECTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
196 id: SchemaId::System(MZ_INTROSPECTION_SCHEMA_ID),
197 oid: SCHEMA_MZ_INTROSPECTION_OID,
198 database_id: None,
199 name: "mz_introspection".to_string(),
200 owner_id: MZ_SYSTEM_ROLE_ID,
201 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
202});
203static SYSTEM_SCHEMAS: LazyLock<BTreeMap<&str, &Schema>> = LazyLock::new(|| {
204 [
205 &*MZ_CATALOG_SCHEMA,
206 &*PG_CATALOG_SCHEMA,
207 &*MZ_INTERNAL_SCHEMA,
208 &*INFORMATION_SCHEMA,
209 &*MZ_UNSAFE_SCHEMA,
210 &*MZ_CATALOG_UNSTABLE_SCHEMA,
211 &*MZ_INTROSPECTION_SCHEMA,
212 ]
213 .into_iter()
214 .map(|s| (&*s.name, s))
215 .collect()
216});
217
218#[mz_ore::instrument]
220pub(crate) async fn initialize(
221 tx: &mut Transaction<'_>,
222 options: &BootstrapArgs,
223 initial_ts: EpochMillis,
224 catalog_content_version: String,
225) -> Result<(), CatalogError> {
226 let mut audit_events = vec![];
228
229 for (name, next_id) in [
230 (USER_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
231 (SYSTEM_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
232 (
233 DATABASE_ID_ALLOC_KEY.to_string(),
234 MATERIALIZE_DATABASE_ID_VAL + 1,
235 ),
236 (
237 SCHEMA_ID_ALLOC_KEY.to_string(),
238 max(&[
239 MZ_CATALOG_SCHEMA_ID,
240 PG_CATALOG_SCHEMA_ID,
241 PUBLIC_SCHEMA_ID,
242 MZ_INTERNAL_SCHEMA_ID,
243 INFORMATION_SCHEMA_ID,
244 MZ_UNSAFE_SCHEMA_ID,
245 MZ_CATALOG_UNSTABLE_SCHEMA_ID,
246 MZ_INTROSPECTION_SCHEMA_ID,
247 ])
248 .expect("known to be non-empty")
249 + 1,
250 ),
251 (USER_ROLE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
252 (
253 USER_CLUSTER_ID_ALLOC_KEY.to_string(),
254 DEFAULT_USER_CLUSTER_ID.inner_id() + 1,
255 ),
256 (
257 SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string(),
258 DEFAULT_ALLOCATOR_ID,
259 ),
260 (
261 USER_REPLICA_ID_ALLOC_KEY.to_string(),
262 DEFAULT_USER_REPLICA_ID.inner_id()
263 + u64::from(options.default_cluster_replication_factor),
264 ),
265 (
266 SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(),
267 DEFAULT_ALLOCATOR_ID,
268 ),
269 (
270 USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(),
271 DEFAULT_ALLOCATOR_ID,
272 ),
273 (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
274 (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
275 (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()),
276 ] {
277 tx.insert_id_allocator(name, next_id)?;
278 }
279
280 for role in BUILTIN_ROLES {
281 tx.insert_builtin_role(
282 role.id,
283 role.name.to_string(),
284 role.attributes.clone(),
285 RoleMembership::new(),
286 RoleVars::default(),
287 role.oid,
288 )?;
289 }
290 tx.insert_builtin_role(
291 RoleId::Public,
292 PUBLIC_ROLE_NAME.as_str().to_lowercase(),
293 RoleAttributes::new(),
294 RoleMembership::new(),
295 RoleVars::default(),
296 ROLE_PUBLIC_OID,
297 )?;
298
299 let bootstrap_role = if let Some(role) = &options.bootstrap_role {
301 let attributes = RoleAttributes::new();
302 let membership = RoleMembership::new();
303 let vars = RoleVars::default();
304
305 let (id, oid) = tx.insert_user_role(
306 role.to_string(),
307 attributes.clone(),
308 membership.clone(),
309 vars.clone(),
310 &HashSet::new(),
311 )?;
312
313 audit_events.push((
314 mz_audit_log::EventType::Create,
315 mz_audit_log::ObjectType::Role,
316 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
317 id: id.to_string(),
318 name: role.to_string(),
319 }),
320 ));
321
322 Some(Role {
323 id,
324 name: role.to_string(),
325 attributes,
326 membership,
327 vars,
328 oid,
329 })
330 } else {
331 None
332 };
333
334 let default_privileges = [
335 DefaultPrivilege {
338 object: DefaultPrivilegeObject {
339 role_id: RoleId::Public,
340 database_id: None,
341 schema_id: None,
342 object_type: mz_sql::catalog::ObjectType::Cluster,
343 },
344 acl_item: DefaultPrivilegeAclItem {
345 grantee: MZ_SUPPORT_ROLE_ID,
346 acl_mode: AclMode::USAGE,
347 },
348 },
349 DefaultPrivilege {
350 object: DefaultPrivilegeObject {
351 role_id: RoleId::Public,
352 database_id: None,
353 schema_id: None,
354 object_type: mz_sql::catalog::ObjectType::Database,
355 },
356 acl_item: DefaultPrivilegeAclItem {
357 grantee: MZ_SUPPORT_ROLE_ID,
358 acl_mode: AclMode::USAGE,
359 },
360 },
361 DefaultPrivilege {
362 object: DefaultPrivilegeObject {
363 role_id: RoleId::Public,
364 database_id: None,
365 schema_id: None,
366 object_type: mz_sql::catalog::ObjectType::Schema,
367 },
368 acl_item: DefaultPrivilegeAclItem {
369 grantee: MZ_SUPPORT_ROLE_ID,
370 acl_mode: AclMode::USAGE,
371 },
372 },
373 DefaultPrivilege {
374 object: DefaultPrivilegeObject {
375 role_id: RoleId::Public,
376 database_id: None,
377 schema_id: None,
378 object_type: mz_sql::catalog::ObjectType::Type,
379 },
380 acl_item: DefaultPrivilegeAclItem {
381 grantee: RoleId::Public,
382 acl_mode: AclMode::USAGE,
383 },
384 },
385 ];
386 tx.set_default_privileges(default_privileges.to_vec())?;
387 for DefaultPrivilege { object, acl_item } in default_privileges {
388 let object_type = match object.object_type {
389 ObjectType::Table => mz_audit_log::ObjectType::Table,
390 ObjectType::View => mz_audit_log::ObjectType::View,
391 ObjectType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
392 ObjectType::Source => mz_audit_log::ObjectType::Source,
393 ObjectType::Sink => mz_audit_log::ObjectType::Sink,
394 ObjectType::Index => mz_audit_log::ObjectType::Index,
395 ObjectType::Type => mz_audit_log::ObjectType::Type,
396 ObjectType::Role => mz_audit_log::ObjectType::Role,
397 ObjectType::Cluster => mz_audit_log::ObjectType::Cluster,
398 ObjectType::ClusterReplica => mz_audit_log::ObjectType::ClusterReplica,
399 ObjectType::Secret => mz_audit_log::ObjectType::Secret,
400 ObjectType::Connection => mz_audit_log::ObjectType::Connection,
401 ObjectType::Database => mz_audit_log::ObjectType::Database,
402 ObjectType::Schema => mz_audit_log::ObjectType::Schema,
403 ObjectType::Func => mz_audit_log::ObjectType::Func,
404 ObjectType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
405 ObjectType::NetworkPolicy => mz_audit_log::ObjectType::NetworkPolicy,
406 };
407 audit_events.push((
408 mz_audit_log::EventType::Grant,
409 object_type,
410 mz_audit_log::EventDetails::AlterDefaultPrivilegeV1(
411 mz_audit_log::AlterDefaultPrivilegeV1 {
412 role_id: object.role_id.to_string(),
413 database_id: object.database_id.map(|id| id.to_string()),
414 schema_id: object.schema_id.map(|id| id.to_string()),
415 grantee_id: acl_item.grantee.to_string(),
416 privileges: acl_item.acl_mode.to_string(),
417 },
418 ),
419 ));
420 }
421
422 let mut db_privileges = vec![
423 MzAclItem {
424 grantee: RoleId::Public,
425 grantor: MZ_SYSTEM_ROLE_ID,
426 acl_mode: AclMode::USAGE,
427 },
428 MzAclItem {
429 grantee: MZ_SUPPORT_ROLE_ID,
430 grantor: MZ_SYSTEM_ROLE_ID,
431 acl_mode: AclMode::USAGE,
432 },
433 rbac::owner_privilege(mz_sql::catalog::ObjectType::Database, MZ_SYSTEM_ROLE_ID),
434 ];
435 if let Some(role) = &bootstrap_role {
437 db_privileges.push(MzAclItem {
438 grantee: role.id.clone(),
439 grantor: MZ_SYSTEM_ROLE_ID,
440 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
441 mz_sql::catalog::ObjectType::Database,
442 )),
443 })
444 };
445
446 let materialize_db_oid = tx.allocate_oid(&HashSet::new())?;
447 tx.insert_database(
448 MATERIALIZE_DATABASE_ID,
449 "materialize",
450 MZ_SYSTEM_ROLE_ID,
451 db_privileges,
452 materialize_db_oid,
453 )?;
454 audit_events.extend([
455 (
456 mz_audit_log::EventType::Create,
457 mz_audit_log::ObjectType::Database,
458 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
459 id: MATERIALIZE_DATABASE_ID.to_string(),
460 name: "materialize".to_string(),
461 }),
462 ),
463 (
464 mz_audit_log::EventType::Grant,
465 mz_audit_log::ObjectType::Database,
466 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
467 object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
468 grantee_id: RoleId::Public.to_string(),
469 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
470 privileges: AclMode::USAGE.to_string(),
471 }),
472 ),
473 ]);
474 if let Some(role) = &bootstrap_role {
476 let role_id: RoleId = role.id.clone();
477 audit_events.push((
478 mz_audit_log::EventType::Grant,
479 mz_audit_log::ObjectType::Database,
480 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
481 object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
482 grantee_id: role_id.to_string(),
483 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
484 privileges: rbac::all_object_privileges(SystemObjectType::Object(
485 mz_sql::catalog::ObjectType::Database,
486 ))
487 .to_string(),
488 }),
489 ));
490 }
491
492 let public_schema_oid = tx.allocate_oid(&HashSet::new())?;
493 let public_schema = Schema {
494 id: SchemaId::User(PUBLIC_SCHEMA_ID),
495 oid: public_schema_oid,
496 database_id: Some(MATERIALIZE_DATABASE_ID),
497 name: "public".to_string(),
498 owner_id: MZ_SYSTEM_ROLE_ID,
499 privileges: vec![
500 MzAclItem {
501 grantee: RoleId::Public,
502 grantor: MZ_SYSTEM_ROLE_ID,
503 acl_mode: AclMode::USAGE,
504 },
505 MzAclItem {
506 grantee: MZ_SUPPORT_ROLE_ID,
507 grantor: MZ_SYSTEM_ROLE_ID,
508 acl_mode: AclMode::USAGE,
509 },
510 rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
511 ]
512 .into_iter()
513 .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
515 grantee: role.id.clone(),
516 grantor: MZ_SYSTEM_ROLE_ID,
517 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
518 mz_sql::catalog::ObjectType::Schema,
519 )),
520 }))
521 .collect(),
522 };
523
524 for schema in SYSTEM_SCHEMAS.values().chain(iter::once(&&public_schema)) {
525 tx.insert_schema(
526 schema.id,
527 schema.database_id,
528 schema.name.clone(),
529 schema.owner_id,
530 schema.privileges.clone(),
531 schema.oid,
532 )?;
533 }
534 audit_events.push((
535 mz_audit_log::EventType::Create,
536 mz_audit_log::ObjectType::Schema,
537 mz_audit_log::EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
538 id: PUBLIC_SCHEMA_ID.to_string(),
539 name: "public".to_string(),
540 database_name: Some("materialize".to_string()),
541 }),
542 ));
543 if let Some(role) = &bootstrap_role {
544 let role_id: RoleId = role.id.clone();
545 audit_events.push((
546 mz_audit_log::EventType::Grant,
547 mz_audit_log::ObjectType::Schema,
548 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
549 object_id: ObjectId::Schema((
550 ResolvedDatabaseSpecifier::Id(MATERIALIZE_DATABASE_ID),
551 SchemaSpecifier::Id(SchemaId::User(PUBLIC_SCHEMA_ID)),
552 ))
553 .to_string(),
554 grantee_id: role_id.to_string(),
555 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
556 privileges: rbac::all_object_privileges(SystemObjectType::Object(
557 mz_sql::catalog::ObjectType::Schema,
558 ))
559 .to_string(),
560 }),
561 ));
562 }
563
564 let mut cluster_privileges = vec![
565 MzAclItem {
566 grantee: RoleId::Public,
567 grantor: MZ_SYSTEM_ROLE_ID,
568 acl_mode: AclMode::USAGE,
569 },
570 MzAclItem {
571 grantee: MZ_SUPPORT_ROLE_ID,
572 grantor: MZ_SYSTEM_ROLE_ID,
573 acl_mode: AclMode::USAGE,
574 },
575 rbac::owner_privilege(mz_sql::catalog::ObjectType::Cluster, MZ_SYSTEM_ROLE_ID),
576 ];
577
578 if let Some(role) = &bootstrap_role {
580 cluster_privileges.push(MzAclItem {
581 grantee: role.id.clone(),
582 grantor: MZ_SYSTEM_ROLE_ID,
583 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
584 mz_sql::catalog::ObjectType::Cluster,
585 )),
586 });
587 };
588
589 tx.insert_network_policy(
590 DEFAULT_USER_NETWORK_POLICY_ID,
591 DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
592 DEFAULT_USER_NETWORK_POLICY_RULES
593 .into_iter()
594 .map(|(name, action, direction, ip_str)| NetworkPolicyRule {
595 name: name.to_string(),
596 action: action.clone(),
597 direction: direction.clone(),
598 address: PolicyAddress(
599 IpNet::from_str(ip_str).expect("default policy must provide valid ip"),
600 ),
601 })
602 .collect::<Vec<NetworkPolicyRule>>(),
603 DEFAULT_USER_NETWORK_POLICY_PRIVILEGES.clone(),
604 MZ_SYSTEM_ROLE_ID,
605 NETWORK_POLICIES_DEFAULT_POLICY_OID,
606 )?;
607 let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
611 assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
612
613 audit_events.extend([(
614 mz_audit_log::EventType::Create,
615 mz_audit_log::ObjectType::NetworkPolicy,
616 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
617 id: DEFAULT_USER_NETWORK_POLICY_ID.to_string(),
618 name: DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
619 }),
620 )]);
621
622 tx.insert_user_cluster(
623 DEFAULT_USER_CLUSTER_ID,
624 DEFAULT_USER_CLUSTER_NAME,
625 Vec::new(),
626 MZ_SYSTEM_ROLE_ID,
627 cluster_privileges,
628 default_cluster_config(options)?,
629 &HashSet::new(),
630 )?;
631 audit_events.extend([
632 (
633 mz_audit_log::EventType::Create,
634 mz_audit_log::ObjectType::Cluster,
635 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
636 id: DEFAULT_USER_CLUSTER_ID.to_string(),
637 name: DEFAULT_USER_CLUSTER_NAME.to_string(),
638 }),
639 ),
640 (
641 mz_audit_log::EventType::Grant,
642 mz_audit_log::ObjectType::Cluster,
643 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
644 object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
645 grantee_id: RoleId::Public.to_string(),
646 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
647 privileges: AclMode::USAGE.to_string(),
648 }),
649 ),
650 ]);
651
652 if let Some(role) = &bootstrap_role {
654 let role_id: RoleId = role.id.clone();
655 audit_events.push((
656 mz_audit_log::EventType::Grant,
657 mz_audit_log::ObjectType::Cluster,
658 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
659 object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
660 grantee_id: role_id.to_string(),
661 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
662 privileges: rbac::all_object_privileges(SystemObjectType::Object(
663 mz_sql::catalog::ObjectType::Cluster,
664 ))
665 .to_string(),
666 }),
667 ));
668 }
669
670 for i in 0..options.default_cluster_replication_factor {
671 let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i));
672 let replica_name = format!("r{}", i + 1);
673 tx.insert_cluster_replica_with_id(
674 DEFAULT_USER_CLUSTER_ID,
675 replica_id,
676 &replica_name,
677 default_replica_config(options)?,
678 MZ_SYSTEM_ROLE_ID,
679 )?;
680 audit_events.push((
681 mz_audit_log::EventType::Create,
682 mz_audit_log::ObjectType::ClusterReplica,
683 mz_audit_log::EventDetails::CreateClusterReplicaV2(
684 mz_audit_log::CreateClusterReplicaV2 {
685 cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(),
686 cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(),
687 replica_name,
688 replica_id: Some(replica_id.to_string()),
689 logical_size: options.default_cluster_replica_size.to_string(),
690 disk: {
691 let cluster_size = options.default_cluster_replica_size.to_string();
692 let cluster_allocation = options
693 .cluster_replica_size_map
694 .get_allocation_by_name(&cluster_size)?;
695 cluster_allocation.is_cc
696 },
697 billed_as: None,
698 internal: false,
699 reason: CreateOrDropClusterReplicaReasonV1::System,
700 scheduling_policies: None,
701 },
702 ),
703 ));
704 }
705
706 let system_privileges = [MzAclItem {
707 grantee: MZ_SYSTEM_ROLE_ID,
708 grantor: MZ_SYSTEM_ROLE_ID,
709 acl_mode: rbac::all_object_privileges(SystemObjectType::System),
710 }]
711 .into_iter()
712 .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
714 grantee: role.id.clone(),
715 grantor: MZ_SYSTEM_ROLE_ID,
716 acl_mode: rbac::all_object_privileges(SystemObjectType::System),
717 }));
718 tx.set_system_privileges(system_privileges.clone().collect())?;
719 for system_privilege in system_privileges {
720 audit_events.push((
721 mz_audit_log::EventType::Grant,
722 mz_audit_log::ObjectType::System,
723 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
724 object_id: "SYSTEM".to_string(),
725 grantee_id: system_privilege.grantee.to_string(),
726 grantor_id: system_privilege.grantor.to_string(),
727 privileges: system_privilege.acl_mode.to_string(),
728 }),
729 ));
730 }
731
732 let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
734 for (ty, obj, details) in audit_events {
735 let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
736 audit_events_with_id.push((id, ty, obj, details));
737 }
738
739 for (id, event_type, object_type, details) in audit_events_with_id {
740 tx.insert_audit_log_event(VersionedEvent::V1(EventV1 {
741 id,
742 event_type,
743 object_type,
744 details,
745 user: None,
746 occurred_at: initial_ts,
747 }));
748 }
749
750 for (key, value) in [
751 (USER_VERSION_KEY.to_string(), CATALOG_VERSION),
752 (SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
753 ] {
754 tx.insert_config(key, value)?;
755 }
756
757 for (name, value) in [
758 (
759 CATALOG_CONTENT_VERSION_KEY.to_string(),
760 catalog_content_version,
761 ),
762 (
763 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
764 ShardId::new().to_string(),
765 ),
766 (
767 EXPRESSION_CACHE_SHARD_KEY.to_string(),
768 ShardId::new().to_string(),
769 ),
770 ] {
771 tx.set_setting(name, Some(value))?;
772 }
773
774 Ok(())
775}
776
777pub fn resolve_system_schema(name: &str) -> &Schema {
778 SYSTEM_SCHEMAS
779 .get(name)
780 .unwrap_or_else(|| panic!("unable to resolve system schema: {name}"))
781}
782
783fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
785 let cluster_size = args.default_cluster_replica_size.to_string();
786 let cluster_allocation = args
787 .cluster_replica_size_map
788 .get_allocation_by_name(&cluster_size)?;
789 Ok(ClusterConfig {
790 variant: ClusterVariant::Managed(ClusterVariantManaged {
791 size: cluster_size,
792 replication_factor: args.default_cluster_replication_factor,
793 availability_zones: vec![],
794 logging: ReplicaLogging {
795 log_logging: false,
796 interval: Some(Duration::from_secs(1)),
797 },
798 disk: cluster_allocation.is_cc,
799 optimizer_feature_overrides: Default::default(),
800 schedule: Default::default(),
801 }),
802 workload_class: None,
803 })
804}
805
806fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
808 let cluster_size = args.default_cluster_replica_size.to_string();
809 let cluster_allocation = args
810 .cluster_replica_size_map
811 .get_allocation_by_name(&cluster_size)?;
812 Ok(ReplicaConfig {
813 location: ReplicaLocation::Managed {
814 size: cluster_size,
815 availability_zone: None,
816 disk: cluster_allocation.is_cc,
817 internal: false,
818 billed_as: None,
819 pending: false,
820 },
821 logging: ReplicaLogging {
822 log_logging: false,
823 interval: Some(Duration::from_secs(1)),
824 },
825 })
826}