1use std::collections::BTreeMap;
11use std::iter;
12use std::str::FromStr;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use base64::prelude::*;
17use ipnet::IpNet;
18use itertools::max;
19use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
20use mz_controller::clusters::ReplicaLogging;
21use mz_controller_types::{ClusterId, ReplicaId};
22use mz_ore::collections::HashSet;
23use mz_ore::now::EpochMillis;
24use mz_persist_types::ShardId;
25use mz_pgrepr::oid::{
26 FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
27 SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
28 SCHEMA_MZ_INTERNAL_OID, SCHEMA_MZ_INTROSPECTION_OID, SCHEMA_MZ_UNSAFE_OID,
29 SCHEMA_PG_CATALOG_OID,
30};
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
32use mz_repr::network_policy_id::NetworkPolicyId;
33use mz_repr::role_id::RoleId;
34use mz_sql::catalog::{
35 DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributesRaw, RoleMembership,
36 RoleVars, SystemObjectType,
37};
38use mz_sql::names::{
39 DatabaseId, ObjectId, PUBLIC_ROLE_NAME, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
40};
41use mz_sql::plan::{NetworkPolicyRule, PolicyAddress};
42use mz_sql::rbac;
43use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
44
45use crate::builtin::BUILTIN_ROLES;
46use crate::durable::upgrade::CATALOG_VERSION;
47use crate::durable::{
48 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, BootstrapArgs,
49 CATALOG_CONTENT_VERSION_KEY, CatalogError, ClusterConfig, ClusterVariant,
50 ClusterVariantManaged, DATABASE_ID_ALLOC_KEY, DefaultPrivilege, EXPRESSION_CACHE_SHARD_KEY,
51 MOCK_AUTHENTICATION_NONCE_KEY, OID_ALLOC_KEY, ReplicaConfig, ReplicaLocation, Role,
52 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
53 SYSTEM_REPLICA_ID_ALLOC_KEY, Schema, Transaction, USER_CLUSTER_ID_ALLOC_KEY,
54 USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
55};
56
57pub const USER_VERSION_KEY: &str = "user_version";
59pub(crate) const SYSTEM_CONFIG_SYNCED_KEY: &str = "system_config_synced";
62
63pub(crate) const WITH_0DT_DEPLOYMENT_MAX_WAIT: &str = "with_0dt_deployment_max_wait";
70
71pub(crate) const WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL: &str =
78 "with_0dt_deployment_ddl_check_interval";
79
80pub(crate) const ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT: &str =
85 "enable_0dt_deployment_panic_after_timeout";
86
87const USER_ID_ALLOC_KEY: &str = "user";
88const SYSTEM_ID_ALLOC_KEY: &str = "system";
89
90const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1);
91const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart";
92
93const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1);
94
95const MATERIALIZE_DATABASE_ID_VAL: u64 = 1;
96const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL);
97
98const MZ_CATALOG_SCHEMA_ID: u64 = 1;
99const PG_CATALOG_SCHEMA_ID: u64 = 2;
100const PUBLIC_SCHEMA_ID: u64 = 3;
101const MZ_INTERNAL_SCHEMA_ID: u64 = 4;
102const INFORMATION_SCHEMA_ID: u64 = 5;
103pub const MZ_UNSAFE_SCHEMA_ID: u64 = 6;
104pub const MZ_CATALOG_UNSTABLE_SCHEMA_ID: u64 = 7;
105pub const MZ_INTROSPECTION_SCHEMA_ID: u64 = 8;
106
107const DEFAULT_ALLOCATOR_ID: u64 = 1;
108
109pub const DEFAULT_USER_NETWORK_POLICY_ID: NetworkPolicyId = NetworkPolicyId::User(1);
110pub const DEFAULT_USER_NETWORK_POLICY_NAME: &str = "default";
111pub const DEFAULT_USER_NETWORK_POLICY_RULES: &[(
112 &str,
113 mz_sql::plan::NetworkPolicyRuleAction,
114 mz_sql::plan::NetworkPolicyRuleDirection,
115 &str,
116)] = &[(
117 "open_ingress",
118 mz_sql::plan::NetworkPolicyRuleAction::Allow,
119 mz_sql::plan::NetworkPolicyRuleDirection::Ingress,
120 "0.0.0.0/0",
121)];
122
123static DEFAULT_USER_NETWORK_POLICY_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
124 vec![rbac::owner_privilege(
125 ObjectType::NetworkPolicy,
126 MZ_SYSTEM_ROLE_ID,
127 )]
128});
129
130static SYSTEM_SCHEMA_PRIVILEGES: LazyLock<Vec<MzAclItem>> = LazyLock::new(|| {
131 vec![
132 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Schema),
133 MzAclItem {
134 grantee: MZ_SUPPORT_ROLE_ID,
135 grantor: MZ_SYSTEM_ROLE_ID,
136 acl_mode: AclMode::USAGE,
137 },
138 rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
139 ]
140});
141
142static MZ_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
143 id: SchemaId::System(MZ_CATALOG_SCHEMA_ID),
144 oid: SCHEMA_MZ_CATALOG_OID,
145 database_id: None,
146 name: "mz_catalog".to_string(),
147 owner_id: MZ_SYSTEM_ROLE_ID,
148 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
149});
150static PG_CATALOG_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
151 id: SchemaId::System(PG_CATALOG_SCHEMA_ID),
152 oid: SCHEMA_PG_CATALOG_OID,
153 database_id: None,
154 name: "pg_catalog".to_string(),
155 owner_id: MZ_SYSTEM_ROLE_ID,
156 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
157});
158static MZ_INTERNAL_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
159 id: SchemaId::System(MZ_INTERNAL_SCHEMA_ID),
160 oid: SCHEMA_MZ_INTERNAL_OID,
161 database_id: None,
162 name: "mz_internal".to_string(),
163 owner_id: MZ_SYSTEM_ROLE_ID,
164 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
165});
166static INFORMATION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
167 id: SchemaId::System(INFORMATION_SCHEMA_ID),
168 oid: SCHEMA_INFORMATION_SCHEMA_OID,
169 database_id: None,
170 name: "information_schema".to_string(),
171 owner_id: MZ_SYSTEM_ROLE_ID,
172 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
173});
174static MZ_UNSAFE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
175 id: SchemaId::System(MZ_UNSAFE_SCHEMA_ID),
176 oid: SCHEMA_MZ_UNSAFE_OID,
177 database_id: None,
178 name: "mz_unsafe".to_string(),
179 owner_id: MZ_SYSTEM_ROLE_ID,
180 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
181});
182static MZ_CATALOG_UNSTABLE_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
183 id: SchemaId::System(MZ_CATALOG_UNSTABLE_SCHEMA_ID),
184 oid: SCHEMA_MZ_CATALOG_UNSTABLE_OID,
185 database_id: None,
186 name: "mz_catalog_unstable".to_string(),
187 owner_id: MZ_SYSTEM_ROLE_ID,
188 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
189});
190static MZ_INTROSPECTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| Schema {
191 id: SchemaId::System(MZ_INTROSPECTION_SCHEMA_ID),
192 oid: SCHEMA_MZ_INTROSPECTION_OID,
193 database_id: None,
194 name: "mz_introspection".to_string(),
195 owner_id: MZ_SYSTEM_ROLE_ID,
196 privileges: SYSTEM_SCHEMA_PRIVILEGES.clone(),
197});
198static SYSTEM_SCHEMAS: LazyLock<BTreeMap<&str, &Schema>> = LazyLock::new(|| {
199 [
200 &*MZ_CATALOG_SCHEMA,
201 &*PG_CATALOG_SCHEMA,
202 &*MZ_INTERNAL_SCHEMA,
203 &*INFORMATION_SCHEMA,
204 &*MZ_UNSAFE_SCHEMA,
205 &*MZ_CATALOG_UNSTABLE_SCHEMA,
206 &*MZ_INTROSPECTION_SCHEMA,
207 ]
208 .into_iter()
209 .map(|s| (&*s.name, s))
210 .collect()
211});
212
213#[mz_ore::instrument]
215pub(crate) async fn initialize(
216 tx: &mut Transaction<'_>,
217 options: &BootstrapArgs,
218 initial_ts: EpochMillis,
219 catalog_content_version: String,
220) -> Result<(), CatalogError> {
221 let mut audit_events = vec![];
223
224 for (name, next_id) in [
225 (USER_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
226 (SYSTEM_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
227 (
228 DATABASE_ID_ALLOC_KEY.to_string(),
229 MATERIALIZE_DATABASE_ID_VAL + 1,
230 ),
231 (
232 SCHEMA_ID_ALLOC_KEY.to_string(),
233 max(&[
234 MZ_CATALOG_SCHEMA_ID,
235 PG_CATALOG_SCHEMA_ID,
236 PUBLIC_SCHEMA_ID,
237 MZ_INTERNAL_SCHEMA_ID,
238 INFORMATION_SCHEMA_ID,
239 MZ_UNSAFE_SCHEMA_ID,
240 MZ_CATALOG_UNSTABLE_SCHEMA_ID,
241 MZ_INTROSPECTION_SCHEMA_ID,
242 ])
243 .expect("known to be non-empty")
244 + 1,
245 ),
246 (USER_ROLE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
247 (
248 USER_CLUSTER_ID_ALLOC_KEY.to_string(),
249 DEFAULT_USER_CLUSTER_ID.inner_id() + 1,
250 ),
251 (
252 SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string(),
253 DEFAULT_ALLOCATOR_ID,
254 ),
255 (
256 USER_REPLICA_ID_ALLOC_KEY.to_string(),
257 DEFAULT_USER_REPLICA_ID.inner_id()
258 + u64::from(options.default_cluster_replication_factor),
259 ),
260 (
261 SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(),
262 DEFAULT_ALLOCATOR_ID,
263 ),
264 (
265 USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(),
266 DEFAULT_ALLOCATOR_ID,
267 ),
268 (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
269 (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID),
270 (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()),
271 ] {
272 tx.insert_id_allocator(name, next_id)?;
273 }
274
275 for role in BUILTIN_ROLES {
276 tx.insert_builtin_role(
277 role.id,
278 role.name.to_string(),
279 role.attributes.clone(),
280 RoleMembership::new(),
281 RoleVars::default(),
282 role.oid,
283 )?;
284 }
285 tx.insert_builtin_role(
286 RoleId::Public,
287 PUBLIC_ROLE_NAME.as_str().to_lowercase(),
288 RoleAttributesRaw::new(),
289 RoleMembership::new(),
290 RoleVars::default(),
291 ROLE_PUBLIC_OID,
292 )?;
293
294 let bootstrap_role = if let Some(role) = &options.bootstrap_role {
296 let attributes = RoleAttributesRaw::new();
297 let membership = RoleMembership::new();
298 let vars = RoleVars::default();
299
300 let (id, oid) = tx.insert_user_role(
301 role.to_string(),
302 attributes.clone(),
303 membership.clone(),
304 vars.clone(),
305 &HashSet::new(),
306 )?;
307
308 audit_events.push((
309 mz_audit_log::EventType::Create,
310 mz_audit_log::ObjectType::Role,
311 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
312 id: id.to_string(),
313 name: role.to_string(),
314 }),
315 ));
316
317 Some(Role {
318 id,
319 name: role.to_string(),
320 attributes: attributes.into(),
321 membership,
322 vars,
323 oid,
324 })
325 } else {
326 None
327 };
328
329 let default_privileges = [
330 DefaultPrivilege {
333 object: DefaultPrivilegeObject {
334 role_id: RoleId::Public,
335 database_id: None,
336 schema_id: None,
337 object_type: mz_sql::catalog::ObjectType::Cluster,
338 },
339 acl_item: DefaultPrivilegeAclItem {
340 grantee: MZ_SUPPORT_ROLE_ID,
341 acl_mode: AclMode::USAGE,
342 },
343 },
344 DefaultPrivilege {
345 object: DefaultPrivilegeObject {
346 role_id: RoleId::Public,
347 database_id: None,
348 schema_id: None,
349 object_type: mz_sql::catalog::ObjectType::Database,
350 },
351 acl_item: DefaultPrivilegeAclItem {
352 grantee: MZ_SUPPORT_ROLE_ID,
353 acl_mode: AclMode::USAGE,
354 },
355 },
356 DefaultPrivilege {
357 object: DefaultPrivilegeObject {
358 role_id: RoleId::Public,
359 database_id: None,
360 schema_id: None,
361 object_type: mz_sql::catalog::ObjectType::Schema,
362 },
363 acl_item: DefaultPrivilegeAclItem {
364 grantee: MZ_SUPPORT_ROLE_ID,
365 acl_mode: AclMode::USAGE,
366 },
367 },
368 DefaultPrivilege {
369 object: DefaultPrivilegeObject {
370 role_id: RoleId::Public,
371 database_id: None,
372 schema_id: None,
373 object_type: mz_sql::catalog::ObjectType::Type,
374 },
375 acl_item: DefaultPrivilegeAclItem {
376 grantee: RoleId::Public,
377 acl_mode: AclMode::USAGE,
378 },
379 },
380 ];
381 tx.set_default_privileges(default_privileges.to_vec())?;
382 for DefaultPrivilege { object, acl_item } in default_privileges {
383 let object_type = match object.object_type {
384 ObjectType::Table => mz_audit_log::ObjectType::Table,
385 ObjectType::View => mz_audit_log::ObjectType::View,
386 ObjectType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
387 ObjectType::Source => mz_audit_log::ObjectType::Source,
388 ObjectType::Sink => mz_audit_log::ObjectType::Sink,
389 ObjectType::Index => mz_audit_log::ObjectType::Index,
390 ObjectType::Type => mz_audit_log::ObjectType::Type,
391 ObjectType::Role => mz_audit_log::ObjectType::Role,
392 ObjectType::Cluster => mz_audit_log::ObjectType::Cluster,
393 ObjectType::ClusterReplica => mz_audit_log::ObjectType::ClusterReplica,
394 ObjectType::Secret => mz_audit_log::ObjectType::Secret,
395 ObjectType::Connection => mz_audit_log::ObjectType::Connection,
396 ObjectType::Database => mz_audit_log::ObjectType::Database,
397 ObjectType::Schema => mz_audit_log::ObjectType::Schema,
398 ObjectType::Func => mz_audit_log::ObjectType::Func,
399 ObjectType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
400 ObjectType::NetworkPolicy => mz_audit_log::ObjectType::NetworkPolicy,
401 };
402 audit_events.push((
403 mz_audit_log::EventType::Grant,
404 object_type,
405 mz_audit_log::EventDetails::AlterDefaultPrivilegeV1(
406 mz_audit_log::AlterDefaultPrivilegeV1 {
407 role_id: object.role_id.to_string(),
408 database_id: object.database_id.map(|id| id.to_string()),
409 schema_id: object.schema_id.map(|id| id.to_string()),
410 grantee_id: acl_item.grantee.to_string(),
411 privileges: acl_item.acl_mode.to_string(),
412 },
413 ),
414 ));
415 }
416
417 let mut db_privileges = vec![
418 MzAclItem {
419 grantee: RoleId::Public,
420 grantor: MZ_SYSTEM_ROLE_ID,
421 acl_mode: AclMode::USAGE,
422 },
423 MzAclItem {
424 grantee: MZ_SUPPORT_ROLE_ID,
425 grantor: MZ_SYSTEM_ROLE_ID,
426 acl_mode: AclMode::USAGE,
427 },
428 rbac::owner_privilege(mz_sql::catalog::ObjectType::Database, MZ_SYSTEM_ROLE_ID),
429 ];
430 if let Some(role) = &bootstrap_role {
432 db_privileges.push(MzAclItem {
433 grantee: role.id.clone(),
434 grantor: MZ_SYSTEM_ROLE_ID,
435 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
436 mz_sql::catalog::ObjectType::Database,
437 )),
438 })
439 };
440
441 let materialize_db_oid = tx.allocate_oid(&HashSet::new())?;
442 tx.insert_database(
443 MATERIALIZE_DATABASE_ID,
444 "materialize",
445 MZ_SYSTEM_ROLE_ID,
446 db_privileges,
447 materialize_db_oid,
448 )?;
449 audit_events.extend([
450 (
451 mz_audit_log::EventType::Create,
452 mz_audit_log::ObjectType::Database,
453 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
454 id: MATERIALIZE_DATABASE_ID.to_string(),
455 name: "materialize".to_string(),
456 }),
457 ),
458 (
459 mz_audit_log::EventType::Grant,
460 mz_audit_log::ObjectType::Database,
461 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
462 object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
463 grantee_id: RoleId::Public.to_string(),
464 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
465 privileges: AclMode::USAGE.to_string(),
466 }),
467 ),
468 ]);
469 if let Some(role) = &bootstrap_role {
471 let role_id: RoleId = role.id.clone();
472 audit_events.push((
473 mz_audit_log::EventType::Grant,
474 mz_audit_log::ObjectType::Database,
475 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
476 object_id: ObjectId::Database(MATERIALIZE_DATABASE_ID).to_string(),
477 grantee_id: role_id.to_string(),
478 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
479 privileges: rbac::all_object_privileges(SystemObjectType::Object(
480 mz_sql::catalog::ObjectType::Database,
481 ))
482 .to_string(),
483 }),
484 ));
485 }
486
487 let public_schema_oid = tx.allocate_oid(&HashSet::new())?;
488 let public_schema = Schema {
489 id: SchemaId::User(PUBLIC_SCHEMA_ID),
490 oid: public_schema_oid,
491 database_id: Some(MATERIALIZE_DATABASE_ID),
492 name: "public".to_string(),
493 owner_id: MZ_SYSTEM_ROLE_ID,
494 privileges: vec![
495 MzAclItem {
496 grantee: RoleId::Public,
497 grantor: MZ_SYSTEM_ROLE_ID,
498 acl_mode: AclMode::USAGE,
499 },
500 MzAclItem {
501 grantee: MZ_SUPPORT_ROLE_ID,
502 grantor: MZ_SYSTEM_ROLE_ID,
503 acl_mode: AclMode::USAGE,
504 },
505 rbac::owner_privilege(mz_sql::catalog::ObjectType::Schema, MZ_SYSTEM_ROLE_ID),
506 ]
507 .into_iter()
508 .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
510 grantee: role.id.clone(),
511 grantor: MZ_SYSTEM_ROLE_ID,
512 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
513 mz_sql::catalog::ObjectType::Schema,
514 )),
515 }))
516 .collect(),
517 };
518
519 for schema in SYSTEM_SCHEMAS.values().chain(iter::once(&&public_schema)) {
520 tx.insert_schema(
521 schema.id,
522 schema.database_id,
523 schema.name.clone(),
524 schema.owner_id,
525 schema.privileges.clone(),
526 schema.oid,
527 )?;
528 }
529 audit_events.push((
530 mz_audit_log::EventType::Create,
531 mz_audit_log::ObjectType::Schema,
532 mz_audit_log::EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
533 id: PUBLIC_SCHEMA_ID.to_string(),
534 name: "public".to_string(),
535 database_name: Some("materialize".to_string()),
536 }),
537 ));
538 if let Some(role) = &bootstrap_role {
539 let role_id: RoleId = role.id.clone();
540 audit_events.push((
541 mz_audit_log::EventType::Grant,
542 mz_audit_log::ObjectType::Schema,
543 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
544 object_id: ObjectId::Schema((
545 ResolvedDatabaseSpecifier::Id(MATERIALIZE_DATABASE_ID),
546 SchemaSpecifier::Id(SchemaId::User(PUBLIC_SCHEMA_ID)),
547 ))
548 .to_string(),
549 grantee_id: role_id.to_string(),
550 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
551 privileges: rbac::all_object_privileges(SystemObjectType::Object(
552 mz_sql::catalog::ObjectType::Schema,
553 ))
554 .to_string(),
555 }),
556 ));
557 }
558
559 let mut cluster_privileges = vec![
560 MzAclItem {
561 grantee: RoleId::Public,
562 grantor: MZ_SYSTEM_ROLE_ID,
563 acl_mode: AclMode::USAGE,
564 },
565 MzAclItem {
566 grantee: MZ_SUPPORT_ROLE_ID,
567 grantor: MZ_SYSTEM_ROLE_ID,
568 acl_mode: AclMode::USAGE,
569 },
570 rbac::owner_privilege(mz_sql::catalog::ObjectType::Cluster, MZ_SYSTEM_ROLE_ID),
571 ];
572
573 if let Some(role) = &bootstrap_role {
575 cluster_privileges.push(MzAclItem {
576 grantee: role.id.clone(),
577 grantor: MZ_SYSTEM_ROLE_ID,
578 acl_mode: rbac::all_object_privileges(SystemObjectType::Object(
579 mz_sql::catalog::ObjectType::Cluster,
580 )),
581 });
582 };
583
584 tx.insert_network_policy(
585 DEFAULT_USER_NETWORK_POLICY_ID,
586 DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
587 DEFAULT_USER_NETWORK_POLICY_RULES
588 .into_iter()
589 .map(|(name, action, direction, ip_str)| NetworkPolicyRule {
590 name: name.to_string(),
591 action: action.clone(),
592 direction: direction.clone(),
593 address: PolicyAddress(
594 IpNet::from_str(ip_str).expect("default policy must provide valid ip"),
595 ),
596 })
597 .collect::<Vec<NetworkPolicyRule>>(),
598 DEFAULT_USER_NETWORK_POLICY_PRIVILEGES.clone(),
599 MZ_SYSTEM_ROLE_ID,
600 NETWORK_POLICIES_DEFAULT_POLICY_OID,
601 )?;
602 let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
606 assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
607
608 audit_events.extend([(
609 mz_audit_log::EventType::Create,
610 mz_audit_log::ObjectType::NetworkPolicy,
611 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
612 id: DEFAULT_USER_NETWORK_POLICY_ID.to_string(),
613 name: DEFAULT_USER_NETWORK_POLICY_NAME.to_string(),
614 }),
615 )]);
616
617 tx.insert_user_cluster(
618 DEFAULT_USER_CLUSTER_ID,
619 DEFAULT_USER_CLUSTER_NAME,
620 Vec::new(),
621 MZ_SYSTEM_ROLE_ID,
622 cluster_privileges,
623 default_cluster_config(options)?,
624 &HashSet::new(),
625 )?;
626 audit_events.extend([
627 (
628 mz_audit_log::EventType::Create,
629 mz_audit_log::ObjectType::Cluster,
630 mz_audit_log::EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
631 id: DEFAULT_USER_CLUSTER_ID.to_string(),
632 name: DEFAULT_USER_CLUSTER_NAME.to_string(),
633 }),
634 ),
635 (
636 mz_audit_log::EventType::Grant,
637 mz_audit_log::ObjectType::Cluster,
638 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
639 object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
640 grantee_id: RoleId::Public.to_string(),
641 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
642 privileges: AclMode::USAGE.to_string(),
643 }),
644 ),
645 ]);
646
647 if let Some(role) = &bootstrap_role {
649 let role_id: RoleId = role.id.clone();
650 audit_events.push((
651 mz_audit_log::EventType::Grant,
652 mz_audit_log::ObjectType::Cluster,
653 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
654 object_id: ObjectId::Cluster(DEFAULT_USER_CLUSTER_ID).to_string(),
655 grantee_id: role_id.to_string(),
656 grantor_id: MZ_SYSTEM_ROLE_ID.to_string(),
657 privileges: rbac::all_object_privileges(SystemObjectType::Object(
658 mz_sql::catalog::ObjectType::Cluster,
659 ))
660 .to_string(),
661 }),
662 ));
663 }
664
665 for i in 0..options.default_cluster_replication_factor {
666 let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i));
667 let replica_name = format!("r{}", i + 1);
668 tx.insert_cluster_replica_with_id(
669 DEFAULT_USER_CLUSTER_ID,
670 replica_id,
671 &replica_name,
672 default_replica_config(options)?,
673 MZ_SYSTEM_ROLE_ID,
674 )?;
675 audit_events.push((
676 mz_audit_log::EventType::Create,
677 mz_audit_log::ObjectType::ClusterReplica,
678 mz_audit_log::EventDetails::CreateClusterReplicaV4(
679 mz_audit_log::CreateClusterReplicaV4 {
680 cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(),
681 cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(),
682 replica_name,
683 replica_id: Some(replica_id.to_string()),
684 logical_size: options.default_cluster_replica_size.to_string(),
685 billed_as: None,
686 internal: false,
687 reason: CreateOrDropClusterReplicaReasonV1::System,
688 scheduling_policies: None,
689 },
690 ),
691 ));
692 }
693
694 let system_privileges = [MzAclItem {
695 grantee: MZ_SYSTEM_ROLE_ID,
696 grantor: MZ_SYSTEM_ROLE_ID,
697 acl_mode: rbac::all_object_privileges(SystemObjectType::System),
698 }]
699 .into_iter()
700 .chain(bootstrap_role.as_ref().map(|role| MzAclItem {
702 grantee: role.id.clone(),
703 grantor: MZ_SYSTEM_ROLE_ID,
704 acl_mode: rbac::all_object_privileges(SystemObjectType::System),
705 }));
706 tx.set_system_privileges(system_privileges.clone().collect())?;
707 for system_privilege in system_privileges {
708 audit_events.push((
709 mz_audit_log::EventType::Grant,
710 mz_audit_log::ObjectType::System,
711 mz_audit_log::EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
712 object_id: "SYSTEM".to_string(),
713 grantee_id: system_privilege.grantee.to_string(),
714 grantor_id: system_privilege.grantor.to_string(),
715 privileges: system_privilege.acl_mode.to_string(),
716 }),
717 ));
718 }
719
720 let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
722 for (ty, obj, details) in audit_events {
723 let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
724 audit_events_with_id.push((id, ty, obj, details));
725 }
726
727 for (id, event_type, object_type, details) in audit_events_with_id {
728 tx.insert_audit_log_event(VersionedEvent::V1(EventV1 {
729 id,
730 event_type,
731 object_type,
732 details,
733 user: None,
734 occurred_at: initial_ts,
735 }));
736 }
737
738 for (key, value) in [
739 (USER_VERSION_KEY.to_string(), CATALOG_VERSION),
740 (SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
741 ] {
742 tx.insert_config(key, value)?;
743 }
744
745 for (name, value) in [
746 (
747 CATALOG_CONTENT_VERSION_KEY.to_string(),
748 catalog_content_version,
749 ),
750 (
751 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
752 ShardId::new().to_string(),
753 ),
754 (
755 EXPRESSION_CACHE_SHARD_KEY.to_string(),
756 ShardId::new().to_string(),
757 ),
758 ] {
759 tx.set_setting(name, Some(value))?;
760 }
761
762 if tx
763 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
764 .is_none()
765 {
766 let mut nonce = [0u8; 24];
767 openssl::rand::rand_bytes(&mut nonce).expect("random number generation failed");
768 tx.set_setting(
769 MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
770 Some(BASE64_STANDARD.encode(nonce)),
771 )?;
772 }
773
774 Ok(())
775}
776
777pub fn resolve_system_schema(name: &str) -> &Schema {
778 SYSTEM_SCHEMAS
779 .get(name)
780 .unwrap_or_else(|| panic!("unable to resolve system schema: {name}"))
781}
782
783fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
785 Ok(ClusterConfig {
786 variant: ClusterVariant::Managed(ClusterVariantManaged {
787 size: args.default_cluster_replica_size.to_string(),
788 replication_factor: args.default_cluster_replication_factor,
789 availability_zones: vec![],
790 logging: ReplicaLogging {
791 log_logging: false,
792 interval: Some(Duration::from_secs(1)),
793 },
794 optimizer_feature_overrides: Default::default(),
795 schedule: Default::default(),
796 }),
797 workload_class: None,
798 })
799}
800
801fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
803 Ok(ReplicaConfig {
804 location: ReplicaLocation::Managed {
805 size: args.default_cluster_replica_size.to_string(),
806 availability_zone: None,
807 internal: false,
808 billed_as: None,
809 pending: false,
810 },
811 logging: ReplicaLogging {
812 log_logging: false,
813 interval: Some(Duration::from_secs(1)),
814 },
815 })
816}