1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14use std::time::Duration;
15
16use itertools::Itertools;
17use mz_adapter_types::compaction::CompactionWindow;
18use mz_adapter_types::connection::ConnectionId;
19use mz_adapter_types::dyncfgs::{
20 ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
21 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
22};
23use mz_audit_log::{
24 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
25 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
26};
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::BuiltinLog;
29use mz_catalog::durable::{NetworkPolicy, Transaction};
30use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
31use mz_catalog::memory::objects::{
32 CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
33 StateUpdateKind, TemporaryItem,
34};
35use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
36use mz_controller_types::{ClusterId, ReplicaId};
37use mz_ore::collections::HashSet;
38use mz_ore::instrument;
39use mz_ore::now::EpochMillis;
40use mz_persist_types::ShardId;
41use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
42use mz_repr::network_policy_id::NetworkPolicyId;
43use mz_repr::role_id::RoleId;
44use mz_repr::{CatalogItemId, ColumnName, ColumnType, Diff, GlobalId, strconv};
45use mz_sql::ast::RawDataType;
46use mz_sql::catalog::{
47 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
48 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, RoleAttributes, RoleMembership,
49 RoleVars,
50};
51use mz_sql::names::{
52 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
53 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
54};
55use mz_sql::plan::{NetworkPolicyRule, PlanError};
56use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
57use mz_sql::session::vars::OwnedVarInput;
58use mz_sql::session::vars::{Value as VarValue, VarInput};
59use mz_sql::{DEFAULT_SCHEMA, rbac};
60use mz_sql_parser::ast::{QualifiedReplica, Value};
61use mz_storage_client::storage_collections::StorageCollections;
62use tracing::{info, trace};
63
64use crate::AdapterError;
65use crate::catalog::{
66 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
67 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
68 is_reserved_role_name, object_type_to_audit_object_type,
69 system_object_type_to_audit_object_type,
70};
71use crate::coord::ConnMeta;
72use crate::coord::cluster_scheduling::SchedulingDecision;
73use crate::util::ResultExt;
74
75#[derive(Debug, Clone)]
76pub enum Op {
77 AlterRetainHistory {
78 id: CatalogItemId,
79 value: Option<Value>,
80 window: CompactionWindow,
81 },
82 AlterRole {
83 id: RoleId,
84 name: String,
85 attributes: RoleAttributes,
86 vars: RoleVars,
87 },
88 AlterNetworkPolicy {
89 id: NetworkPolicyId,
90 rules: Vec<NetworkPolicyRule>,
91 name: String,
92 owner_id: RoleId,
93 },
94 AlterAddColumn {
95 id: CatalogItemId,
96 new_global_id: GlobalId,
97 name: ColumnName,
98 typ: ColumnType,
99 sql: RawDataType,
100 },
101 CreateDatabase {
102 name: String,
103 owner_id: RoleId,
104 },
105 CreateSchema {
106 database_id: ResolvedDatabaseSpecifier,
107 schema_name: String,
108 owner_id: RoleId,
109 },
110 CreateRole {
111 name: String,
112 attributes: RoleAttributes,
113 },
114 CreateCluster {
115 id: ClusterId,
116 name: String,
117 introspection_sources: Vec<&'static BuiltinLog>,
118 owner_id: RoleId,
119 config: ClusterConfig,
120 },
121 CreateClusterReplica {
122 cluster_id: ClusterId,
123 name: String,
124 config: ReplicaConfig,
125 owner_id: RoleId,
126 reason: ReplicaCreateDropReason,
127 },
128 CreateItem {
129 id: CatalogItemId,
130 name: QualifiedItemName,
131 item: CatalogItem,
132 owner_id: RoleId,
133 },
134 CreateNetworkPolicy {
135 rules: Vec<NetworkPolicyRule>,
136 name: String,
137 owner_id: RoleId,
138 },
139 Comment {
140 object_id: CommentObjectId,
141 sub_component: Option<usize>,
142 comment: Option<String>,
143 },
144 DropObjects(Vec<DropObjectInfo>),
145 GrantRole {
146 role_id: RoleId,
147 member_id: RoleId,
148 grantor_id: RoleId,
149 },
150 RenameCluster {
151 id: ClusterId,
152 name: String,
153 to_name: String,
154 check_reserved_names: bool,
155 },
156 RenameClusterReplica {
157 cluster_id: ClusterId,
158 replica_id: ReplicaId,
159 name: QualifiedReplica,
160 to_name: String,
161 },
162 RenameItem {
163 id: CatalogItemId,
164 current_full_name: FullItemName,
165 to_name: String,
166 },
167 RenameSchema {
168 database_spec: ResolvedDatabaseSpecifier,
169 schema_spec: SchemaSpecifier,
170 new_name: String,
171 check_reserved_names: bool,
172 },
173 UpdateOwner {
174 id: ObjectId,
175 new_owner: RoleId,
176 },
177 UpdatePrivilege {
178 target_id: SystemObjectId,
179 privilege: MzAclItem,
180 variant: UpdatePrivilegeVariant,
181 },
182 UpdateDefaultPrivilege {
183 privilege_object: DefaultPrivilegeObject,
184 privilege_acl_item: DefaultPrivilegeAclItem,
185 variant: UpdatePrivilegeVariant,
186 },
187 RevokeRole {
188 role_id: RoleId,
189 member_id: RoleId,
190 grantor_id: RoleId,
191 },
192 UpdateClusterConfig {
193 id: ClusterId,
194 name: String,
195 config: ClusterConfig,
196 },
197 UpdateClusterReplicaConfig {
198 cluster_id: ClusterId,
199 replica_id: ReplicaId,
200 config: ReplicaConfig,
201 },
202 UpdateItem {
203 id: CatalogItemId,
204 name: QualifiedItemName,
205 to_item: CatalogItem,
206 },
207 UpdateSourceReferences {
208 source_id: CatalogItemId,
209 references: SourceReferences,
210 },
211 UpdateSystemConfiguration {
212 name: String,
213 value: OwnedVarInput,
214 },
215 ResetSystemConfiguration {
216 name: String,
217 },
218 ResetAllSystemConfiguration,
219 WeirdStorageUsageUpdates {
226 object_id: Option<String>,
227 size_bytes: u64,
228 collection_timestamp: EpochMillis,
229 },
230 TransactionDryRun,
236}
237
238#[derive(Debug, Clone)]
242pub enum DropObjectInfo {
243 Cluster(ClusterId),
244 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
245 Database(DatabaseId),
246 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
247 Role(RoleId),
248 Item(CatalogItemId),
249 NetworkPolicy(NetworkPolicyId),
250}
251
252impl DropObjectInfo {
253 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
256 match id {
257 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
258 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
259 cluster_id,
260 replica_id,
261 ReplicaCreateDropReason::Manual,
262 )),
263 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
264 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
265 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
266 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
267 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
268 }
269 }
270
271 fn to_object_id(&self) -> ObjectId {
274 match &self {
275 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
276 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
277 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
278 }
279 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
280 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
281 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
282 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
283 DropObjectInfo::NetworkPolicy(network_policy_id) => {
284 ObjectId::NetworkPolicy(network_policy_id.clone())
285 }
286 }
287 }
288}
289
290#[derive(Debug, Clone)]
292pub enum ReplicaCreateDropReason {
293 Manual,
298 ClusterScheduling(Vec<SchedulingDecision>),
301}
302
303impl ReplicaCreateDropReason {
304 pub fn into_audit_log(
305 self,
306 ) -> (
307 CreateOrDropClusterReplicaReasonV1,
308 Option<SchedulingDecisionsWithReasonsV2>,
309 ) {
310 let (reason, scheduling_policies) = match self {
311 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
312 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
313 CreateOrDropClusterReplicaReasonV1::Schedule,
314 Some(scheduling_decisions),
315 ),
316 };
317 (
318 reason,
319 scheduling_policies
320 .as_ref()
321 .map(SchedulingDecision::reasons_to_audit_log_reasons),
322 )
323 }
324}
325
326pub struct TransactionResult {
327 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
328 pub audit_events: Vec<VersionedEvent>,
329}
330
331impl Catalog {
332 fn should_audit_log_item(item: &CatalogItem) -> bool {
333 !item.is_temporary()
334 }
335
336 fn temporary_ids(
339 &self,
340 ops: &[Op],
341 temporary_drops: BTreeSet<(&ConnectionId, String)>,
342 ) -> Result<BTreeSet<CatalogItemId>, Error> {
343 let mut creating = BTreeSet::new();
344 let mut temporary_ids = BTreeSet::new();
345 for op in ops.iter() {
346 if let Op::CreateItem {
347 id,
348 name,
349 item,
350 owner_id: _,
351 } = op
352 {
353 if let Some(conn_id) = item.conn_id() {
354 if self.item_exists_in_temp_schemas(conn_id, &name.item)
355 && !temporary_drops.contains(&(conn_id, name.item.clone()))
356 || creating.contains(&(conn_id, &name.item))
357 {
358 return Err(
359 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
360 );
361 } else {
362 creating.insert((conn_id, &name.item));
363 temporary_ids.insert(id.clone());
364 }
365 }
366 }
367 }
368 Ok(temporary_ids)
369 }
370
371 #[instrument(name = "catalog::transact")]
372 pub async fn transact(
373 &mut self,
374 storage_collections: Option<
377 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
378 >,
379 oracle_write_ts: mz_repr::Timestamp,
380 session: Option<&ConnMeta>,
381 ops: Vec<Op>,
382 ) -> Result<TransactionResult, AdapterError> {
383 trace!("transact: {:?}", ops);
384 fail::fail_point!("catalog_transact", |arg| {
385 Err(AdapterError::Unstructured(anyhow::anyhow!(
386 "failpoint: {arg:?}"
387 )))
388 });
389
390 let drop_ids: BTreeSet<CatalogItemId> = ops
391 .iter()
392 .filter_map(|op| match op {
393 Op::DropObjects(drop_object_infos) => {
394 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
395 let item_ids = ids.filter_map(|id| match id {
396 ObjectId::Item(id) => Some(id),
397 _ => None,
398 });
399 Some(item_ids)
400 }
401 _ => None,
402 })
403 .flatten()
404 .collect();
405 let temporary_drops = drop_ids
406 .iter()
407 .filter_map(|id| {
408 let entry = self.get_entry(id);
409 match entry.item().conn_id() {
410 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
411 None => None,
412 }
413 })
414 .collect();
415 let dropped_global_ids = drop_ids
416 .iter()
417 .flat_map(|item_id| self.get_global_ids(item_id))
418 .collect();
419
420 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
421 let mut builtin_table_updates = vec![];
422 let mut audit_events = vec![];
423 let mut storage = self.storage().await;
424 let mut tx = storage
425 .transaction()
426 .await
427 .unwrap_or_terminate("starting catalog transaction");
428 let mut state = self.state.clone();
430
431 Self::transact_inner(
432 storage_collections,
433 oracle_write_ts,
434 session,
435 ops,
436 temporary_ids,
437 &mut builtin_table_updates,
438 &mut audit_events,
439 &mut tx,
440 &mut state,
441 )
442 .await?;
443
444 tx.commit(oracle_write_ts)
449 .await
450 .unwrap_or_terminate("catalog storage transaction commit must succeed");
451
452 drop(storage);
455 self.state = state;
456 self.transient_revision += 1;
457
458 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
460 if self.state.system_config().enable_mz_notices() {
461 self.state().pack_optimizer_notices(
463 &mut builtin_table_updates,
464 dropped_notices.iter(),
465 Diff::MINUS_ONE,
466 );
467 }
468
469 Ok(TransactionResult {
470 builtin_table_updates,
471 audit_events,
472 })
473 }
474
475 #[instrument(name = "catalog::transact_inner")]
482 async fn transact_inner(
483 storage_collections: Option<
484 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
485 >,
486 oracle_write_ts: mz_repr::Timestamp,
487 session: Option<&ConnMeta>,
488 mut ops: Vec<Op>,
489 temporary_ids: BTreeSet<CatalogItemId>,
490 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
491 audit_events: &mut Vec<VersionedEvent>,
492 tx: &mut Transaction<'_>,
493 state: &mut CatalogState,
494 ) -> Result<(), AdapterError> {
495 let dry_run_ops = match ops.last() {
496 Some(Op::TransactionDryRun) => {
497 ops.pop();
499 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
500 ops.clone()
501 }
502 Some(_) => vec![],
503 None => return Ok(()),
504 };
505
506 let mut storage_collections_to_create = BTreeSet::new();
507 let mut storage_collections_to_drop = BTreeSet::new();
508 let mut storage_collections_to_register = BTreeMap::new();
509
510 for op in ops {
511 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
512 oracle_write_ts,
513 session,
514 op,
515 &temporary_ids,
516 audit_events,
517 tx,
518 state,
519 &mut storage_collections_to_create,
520 &mut storage_collections_to_drop,
521 &mut storage_collections_to_register,
522 )
523 .await?;
524
525 if let Some(builtin_table_update) = weird_builtin_table_update {
534 builtin_table_updates.push(builtin_table_update);
535 }
536
537 let op_id = tx.op_id().into();
542 let temporary_item_updates =
543 temporary_item_updates
544 .into_iter()
545 .map(|(item, diff)| StateUpdate {
546 kind: StateUpdateKind::TemporaryItem(item),
547 ts: op_id,
548 diff,
549 });
550
551 let mut updates: Vec<_> = tx.get_and_commit_op_updates();
552 updates.extend(temporary_item_updates);
553 let op_builtin_table_updates = state.apply_updates(updates)?;
554 let op_builtin_table_updates =
555 state.resolve_builtin_table_updates(op_builtin_table_updates);
556 builtin_table_updates.extend(op_builtin_table_updates);
557 }
558
559 if dry_run_ops.is_empty() {
560 if let Some(c) = storage_collections {
562 c.prepare_state(
563 tx,
564 storage_collections_to_create,
565 storage_collections_to_drop,
566 storage_collections_to_register,
567 )
568 .await?;
569 }
570
571 let updates = tx.get_and_commit_op_updates();
572 let op_builtin_table_updates = state.apply_updates(updates)?;
573 let op_builtin_table_updates =
574 state.resolve_builtin_table_updates(op_builtin_table_updates);
575 builtin_table_updates.extend(op_builtin_table_updates);
576
577 Ok(())
578 } else {
579 Err(AdapterError::TransactionDryRun {
580 new_ops: dry_run_ops,
581 new_state: state.clone(),
582 })
583 }
584 }
585
586 #[instrument]
594 async fn transact_op(
595 oracle_write_ts: mz_repr::Timestamp,
596 session: Option<&ConnMeta>,
597 op: Op,
598 temporary_ids: &BTreeSet<CatalogItemId>,
599 audit_events: &mut Vec<VersionedEvent>,
600 tx: &mut Transaction<'_>,
601 state: &CatalogState,
602 storage_collections_to_create: &mut BTreeSet<GlobalId>,
603 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
604 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
605 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
606 let mut weird_builtin_table_update = None;
607 let mut temporary_item_updates = Vec::new();
608
609 match op {
610 Op::TransactionDryRun => {
611 unreachable!("TransactionDryRun can only be used a final element of ops")
612 }
613 Op::AlterRetainHistory { id, value, window } => {
614 let entry = state.get_entry(&id);
615 if id.is_system() {
616 let name = entry.name();
617 let full_name =
618 state.resolve_full_name(name, session.map(|session| session.conn_id()));
619 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
620 full_name.to_string(),
621 ))));
622 }
623
624 let mut new_entry = entry.clone();
625 let previous = new_entry
626 .item
627 .update_retain_history(value.clone(), window)
628 .map_err(|_| {
629 AdapterError::Catalog(Error::new(ErrorKind::Internal(
630 "planner should have rejected invalid alter retain history item type"
631 .to_string(),
632 )))
633 })?;
634
635 if Self::should_audit_log_item(new_entry.item()) {
636 let details =
637 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
638 id: id.to_string(),
639 old_history: previous.map(|previous| previous.to_string()),
640 new_history: value.map(|v| v.to_string()),
641 });
642 CatalogState::add_to_audit_log(
643 &state.system_configuration,
644 oracle_write_ts,
645 session,
646 tx,
647 audit_events,
648 EventType::Alter,
649 catalog_type_to_audit_object_type(new_entry.item().typ()),
650 details,
651 )?;
652 }
653
654 tx.update_item(id, new_entry.into())?;
655
656 Self::log_update(state, &id);
657 }
658 Op::AlterRole {
659 id,
660 name,
661 attributes,
662 vars,
663 } => {
664 state.ensure_not_reserved_role(&id)?;
665
666 let mut existing_role = state.get_role(&id).clone();
667 existing_role.attributes = attributes;
668 existing_role.vars = vars;
669 tx.update_role(id, existing_role.into())?;
670
671 CatalogState::add_to_audit_log(
672 &state.system_configuration,
673 oracle_write_ts,
674 session,
675 tx,
676 audit_events,
677 EventType::Alter,
678 ObjectType::Role,
679 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
680 id: id.to_string(),
681 name: name.clone(),
682 }),
683 )?;
684
685 info!("update role {name} ({id})");
686 }
687 Op::AlterNetworkPolicy {
688 id,
689 rules,
690 name,
691 owner_id: _owner_id,
692 } => {
693 let existing_policy = state.get_network_policy(&id).clone();
694 let mut policy: NetworkPolicy = existing_policy.into();
695 policy.rules = rules;
696 if is_reserved_name(&name) {
697 return Err(AdapterError::Catalog(Error::new(
698 ErrorKind::ReservedNetworkPolicyName(name),
699 )));
700 }
701 tx.update_network_policy(id, policy.clone())?;
702
703 CatalogState::add_to_audit_log(
704 &state.system_configuration,
705 oracle_write_ts,
706 session,
707 tx,
708 audit_events,
709 EventType::Alter,
710 ObjectType::NetworkPolicy,
711 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
712 id: id.to_string(),
713 name: name.clone(),
714 }),
715 )?;
716
717 info!("update network policy {name} ({id})");
718 }
719 Op::AlterAddColumn {
720 id,
721 new_global_id,
722 name,
723 typ,
724 sql,
725 } => {
726 let mut new_entry = state.get_entry(&id).clone();
727 let version = new_entry.item.add_column(name, typ, sql)?;
728 let shard_id = state
731 .storage_metadata()
732 .get_collection_shard(new_entry.latest_global_id())?;
733
734 let CatalogItem::Table(table) = &mut new_entry.item else {
736 return Err(AdapterError::Unsupported("adding columns to non-Table"));
737 };
738 table.collections.insert(version, new_global_id);
739
740 tx.update_item(id, new_entry.into())?;
741 storage_collections_to_register.insert(new_global_id, shard_id);
742 }
743 Op::CreateDatabase { name, owner_id } => {
744 let database_owner_privileges = vec![rbac::owner_privilege(
745 mz_sql::catalog::ObjectType::Database,
746 owner_id,
747 )];
748 let database_default_privileges = state
749 .default_privileges
750 .get_applicable_privileges(
751 owner_id,
752 None,
753 None,
754 mz_sql::catalog::ObjectType::Database,
755 )
756 .map(|item| item.mz_acl_item(owner_id));
757 let database_privileges: Vec<_> = merge_mz_acl_items(
758 database_owner_privileges
759 .into_iter()
760 .chain(database_default_privileges),
761 )
762 .collect();
763
764 let schema_owner_privileges = vec![rbac::owner_privilege(
765 mz_sql::catalog::ObjectType::Schema,
766 owner_id,
767 )];
768 let schema_default_privileges = state
769 .default_privileges
770 .get_applicable_privileges(
771 owner_id,
772 None,
773 None,
774 mz_sql::catalog::ObjectType::Schema,
775 )
776 .map(|item| item.mz_acl_item(owner_id))
777 .chain(std::iter::once(MzAclItem {
779 grantee: RoleId::Public,
780 grantor: owner_id,
781 acl_mode: AclMode::USAGE,
782 }));
783 let schema_privileges: Vec<_> = merge_mz_acl_items(
784 schema_owner_privileges
785 .into_iter()
786 .chain(schema_default_privileges),
787 )
788 .collect();
789
790 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
791 let (database_id, _) = tx.insert_user_database(
792 &name,
793 owner_id,
794 database_privileges.clone(),
795 &temporary_oids,
796 )?;
797 let (schema_id, _) = tx.insert_user_schema(
798 database_id,
799 DEFAULT_SCHEMA,
800 owner_id,
801 schema_privileges.clone(),
802 &temporary_oids,
803 )?;
804 CatalogState::add_to_audit_log(
805 &state.system_configuration,
806 oracle_write_ts,
807 session,
808 tx,
809 audit_events,
810 EventType::Create,
811 ObjectType::Database,
812 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
813 id: database_id.to_string(),
814 name: name.clone(),
815 }),
816 )?;
817 info!("create database {}", name);
818
819 CatalogState::add_to_audit_log(
820 &state.system_configuration,
821 oracle_write_ts,
822 session,
823 tx,
824 audit_events,
825 EventType::Create,
826 ObjectType::Schema,
827 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
828 id: schema_id.to_string(),
829 name: DEFAULT_SCHEMA.to_string(),
830 database_name: Some(name),
831 }),
832 )?;
833 }
834 Op::CreateSchema {
835 database_id,
836 schema_name,
837 owner_id,
838 } => {
839 if is_reserved_name(&schema_name) {
840 return Err(AdapterError::Catalog(Error::new(
841 ErrorKind::ReservedSchemaName(schema_name),
842 )));
843 }
844 let database_id = match database_id {
845 ResolvedDatabaseSpecifier::Id(id) => id,
846 ResolvedDatabaseSpecifier::Ambient => {
847 return Err(AdapterError::Catalog(Error::new(
848 ErrorKind::ReadOnlySystemSchema(schema_name),
849 )));
850 }
851 };
852 let owner_privileges = vec![rbac::owner_privilege(
853 mz_sql::catalog::ObjectType::Schema,
854 owner_id,
855 )];
856 let default_privileges = state
857 .default_privileges
858 .get_applicable_privileges(
859 owner_id,
860 Some(database_id),
861 None,
862 mz_sql::catalog::ObjectType::Schema,
863 )
864 .map(|item| item.mz_acl_item(owner_id));
865 let privileges: Vec<_> =
866 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
867 .collect();
868 let (schema_id, _) = tx.insert_user_schema(
869 database_id,
870 &schema_name,
871 owner_id,
872 privileges.clone(),
873 &state.get_temporary_oids().collect(),
874 )?;
875 CatalogState::add_to_audit_log(
876 &state.system_configuration,
877 oracle_write_ts,
878 session,
879 tx,
880 audit_events,
881 EventType::Create,
882 ObjectType::Schema,
883 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
884 id: schema_id.to_string(),
885 name: schema_name.clone(),
886 database_name: Some(state.database_by_id[&database_id].name.clone()),
887 }),
888 )?;
889 }
890 Op::CreateRole { name, attributes } => {
891 if is_reserved_role_name(&name) {
892 return Err(AdapterError::Catalog(Error::new(
893 ErrorKind::ReservedRoleName(name),
894 )));
895 }
896 let membership = RoleMembership::new();
897 let vars = RoleVars::default();
898 let (id, _) = tx.insert_user_role(
899 name.clone(),
900 attributes.clone(),
901 membership.clone(),
902 vars.clone(),
903 &state.get_temporary_oids().collect(),
904 )?;
905 CatalogState::add_to_audit_log(
906 &state.system_configuration,
907 oracle_write_ts,
908 session,
909 tx,
910 audit_events,
911 EventType::Create,
912 ObjectType::Role,
913 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
914 id: id.to_string(),
915 name: name.clone(),
916 }),
917 )?;
918 info!("create role {}", name);
919 }
920 Op::CreateCluster {
921 id,
922 name,
923 introspection_sources,
924 owner_id,
925 config,
926 } => {
927 if is_reserved_name(&name) {
928 return Err(AdapterError::Catalog(Error::new(
929 ErrorKind::ReservedClusterName(name),
930 )));
931 }
932 let owner_privileges = vec![rbac::owner_privilege(
933 mz_sql::catalog::ObjectType::Cluster,
934 owner_id,
935 )];
936 let default_privileges = state
937 .default_privileges
938 .get_applicable_privileges(
939 owner_id,
940 None,
941 None,
942 mz_sql::catalog::ObjectType::Cluster,
943 )
944 .map(|item| item.mz_acl_item(owner_id));
945 let privileges: Vec<_> =
946 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
947 .collect();
948 let introspection_source_ids: Vec<_> = introspection_sources
949 .iter()
950 .map(|introspection_source| {
951 Transaction::allocate_introspection_source_index_id(
952 &id,
953 introspection_source.variant,
954 )
955 })
956 .collect();
957
958 let introspection_sources = introspection_sources
959 .into_iter()
960 .zip_eq(introspection_source_ids)
961 .map(|(log, (item_id, gid))| (log, item_id, gid))
962 .collect();
963
964 tx.insert_user_cluster(
965 id,
966 &name,
967 introspection_sources,
968 owner_id,
969 privileges.clone(),
970 config.clone().into(),
971 &state.get_temporary_oids().collect(),
972 )?;
973 CatalogState::add_to_audit_log(
974 &state.system_configuration,
975 oracle_write_ts,
976 session,
977 tx,
978 audit_events,
979 EventType::Create,
980 ObjectType::Cluster,
981 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
982 id: id.to_string(),
983 name: name.clone(),
984 }),
985 )?;
986 info!("create cluster {}", name);
987 }
988 Op::CreateClusterReplica {
989 cluster_id,
990 name,
991 config,
992 owner_id,
993 reason,
994 } => {
995 if is_reserved_name(&name) {
996 return Err(AdapterError::Catalog(Error::new(
997 ErrorKind::ReservedReplicaName(name),
998 )));
999 }
1000 let cluster = state.get_cluster(cluster_id);
1001 let id =
1002 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1003 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1004 size,
1005 disk,
1006 billed_as,
1007 internal,
1008 ..
1009 }) = &config.location
1010 {
1011 let (reason, scheduling_policies) = reason.into_audit_log();
1012 let details = EventDetails::CreateClusterReplicaV3(
1013 mz_audit_log::CreateClusterReplicaV3 {
1014 cluster_id: cluster_id.to_string(),
1015 cluster_name: cluster.name.clone(),
1016 replica_id: Some(id.to_string()),
1017 replica_name: name.clone(),
1018 logical_size: size.clone(),
1019 disk: *disk,
1020 billed_as: billed_as.clone(),
1021 internal: *internal,
1022 reason,
1023 scheduling_policies,
1024 },
1025 );
1026 CatalogState::add_to_audit_log(
1027 &state.system_configuration,
1028 oracle_write_ts,
1029 session,
1030 tx,
1031 audit_events,
1032 EventType::Create,
1033 ObjectType::ClusterReplica,
1034 details,
1035 )?;
1036 }
1037 }
1038 Op::CreateItem {
1039 id,
1040 name,
1041 item,
1042 owner_id,
1043 } => {
1044 state.check_unstable_dependencies(&item)?;
1045
1046 match &item {
1047 CatalogItem::Table(table) => {
1048 let gids: Vec<_> = table.global_ids().collect();
1049 assert_eq!(gids.len(), 1);
1050 storage_collections_to_create.extend(gids);
1051 }
1052 CatalogItem::Source(source) => {
1053 storage_collections_to_create.insert(source.global_id());
1054 }
1055 CatalogItem::MaterializedView(mv) => {
1056 storage_collections_to_create.insert(mv.global_id());
1057 }
1058 CatalogItem::ContinualTask(ct) => {
1059 storage_collections_to_create.insert(ct.global_id());
1060 }
1061 CatalogItem::Sink(sink) => {
1062 storage_collections_to_create.insert(sink.global_id());
1063 }
1064 CatalogItem::Log(_)
1065 | CatalogItem::View(_)
1066 | CatalogItem::Index(_)
1067 | CatalogItem::Type(_)
1068 | CatalogItem::Func(_)
1069 | CatalogItem::Secret(_)
1070 | CatalogItem::Connection(_) => (),
1071 }
1072
1073 let system_user = session.map_or(false, |s| s.user().is_system_user());
1074 if !system_user {
1075 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1076 let cluster_name = state.clusters_by_id[&id].name.clone();
1077 return Err(AdapterError::Catalog(Error::new(
1078 ErrorKind::ReadOnlyCluster(cluster_name),
1079 )));
1080 }
1081 }
1082
1083 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1084 let default_privileges = state
1085 .default_privileges
1086 .get_applicable_privileges(
1087 owner_id,
1088 name.qualifiers.database_spec.id(),
1089 Some(name.qualifiers.schema_spec.into()),
1090 item.typ().into(),
1091 )
1092 .map(|item| item.mz_acl_item(owner_id));
1093 let progress_source_privilege = if item.is_progress_source() {
1095 Some(MzAclItem {
1096 grantee: MZ_SUPPORT_ROLE_ID,
1097 grantor: owner_id,
1098 acl_mode: AclMode::SELECT,
1099 })
1100 } else {
1101 None
1102 };
1103 let privileges: Vec<_> = merge_mz_acl_items(
1104 owner_privileges
1105 .into_iter()
1106 .chain(default_privileges)
1107 .chain(progress_source_privilege),
1108 )
1109 .collect();
1110
1111 let temporary_oids = state.get_temporary_oids().collect();
1112
1113 if item.is_temporary() {
1114 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1115 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1116 {
1117 return Err(AdapterError::Catalog(Error::new(
1118 ErrorKind::InvalidTemporarySchema,
1119 )));
1120 }
1121 let oid = tx.allocate_oid(&temporary_oids)?;
1122 let item = TemporaryItem {
1123 id,
1124 oid,
1125 name: name.clone(),
1126 item: item.clone(),
1127 owner_id,
1128 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1129 };
1130 temporary_item_updates.push((item, StateDiff::Addition));
1131 } else {
1132 if let Some(temp_id) =
1133 item.uses()
1134 .iter()
1135 .find(|id| match state.try_get_entry(*id) {
1136 Some(entry) => entry.item().is_temporary(),
1137 None => temporary_ids.contains(id),
1138 })
1139 {
1140 let temp_item = state.get_entry(temp_id);
1141 return Err(AdapterError::Catalog(Error::new(
1142 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1143 )));
1144 }
1145 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1146 && !system_user
1147 {
1148 let schema_name = state
1149 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1150 .schema;
1151 return Err(AdapterError::Catalog(Error::new(
1152 ErrorKind::ReadOnlySystemSchema(schema_name),
1153 )));
1154 }
1155 let schema_id = name.qualifiers.schema_spec.clone().into();
1156 let item_type = item.typ();
1157 let (create_sql, global_id, versions) = item.to_serialized();
1158 tx.insert_user_item(
1159 id,
1160 global_id,
1161 schema_id,
1162 &name.item,
1163 create_sql,
1164 owner_id,
1165 privileges.clone(),
1166 &temporary_oids,
1167 versions,
1168 )?;
1169 info!(
1170 "create {} {} ({})",
1171 item_type,
1172 state.resolve_full_name(&name, None),
1173 id
1174 );
1175 }
1176
1177 if Self::should_audit_log_item(&item) {
1178 let name = Self::full_name_detail(
1179 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1180 );
1181 let details = match &item {
1182 CatalogItem::Source(s) => {
1183 let cluster_id = match s.data_source {
1184 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1187 match state.get_entry(&ingestion_id).cluster_id() {
1188 Some(cluster_id) => Some(cluster_id.to_string()),
1189 None => None,
1190 }
1191 }
1192 _ => match item.cluster_id() {
1193 Some(cluster_id) => Some(cluster_id.to_string()),
1194 None => None,
1195 },
1196 };
1197
1198 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1199 id: id.to_string(),
1200 cluster_id,
1201 name,
1202 external_type: s.source_type().to_string(),
1203 })
1204 }
1205 CatalogItem::Sink(s) => {
1206 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1207 id: id.to_string(),
1208 cluster_id: Some(s.cluster_id.to_string()),
1209 name,
1210 external_type: s.sink_type().to_string(),
1211 })
1212 }
1213 CatalogItem::Index(i) => {
1214 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1215 id: id.to_string(),
1216 name,
1217 cluster_id: i.cluster_id.to_string(),
1218 })
1219 }
1220 CatalogItem::MaterializedView(mv) => {
1221 EventDetails::CreateMaterializedViewV1(
1222 mz_audit_log::CreateMaterializedViewV1 {
1223 id: id.to_string(),
1224 name,
1225 cluster_id: mv.cluster_id.to_string(),
1226 },
1227 )
1228 }
1229 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1230 id: id.to_string(),
1231 name,
1232 }),
1233 };
1234 CatalogState::add_to_audit_log(
1235 &state.system_configuration,
1236 oracle_write_ts,
1237 session,
1238 tx,
1239 audit_events,
1240 EventType::Create,
1241 catalog_type_to_audit_object_type(item.typ()),
1242 details,
1243 )?;
1244 }
1245 }
1246 Op::CreateNetworkPolicy {
1247 rules,
1248 name,
1249 owner_id,
1250 } => {
1251 if state.network_policies_by_name.contains_key(&name) {
1252 return Err(AdapterError::PlanError(PlanError::Catalog(
1253 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1254 )));
1255 }
1256 if is_reserved_name(&name) {
1257 return Err(AdapterError::Catalog(Error::new(
1258 ErrorKind::ReservedNetworkPolicyName(name),
1259 )));
1260 }
1261
1262 let owner_privileges = vec![rbac::owner_privilege(
1263 mz_sql::catalog::ObjectType::NetworkPolicy,
1264 owner_id,
1265 )];
1266 let default_privileges = state
1267 .default_privileges
1268 .get_applicable_privileges(
1269 owner_id,
1270 None,
1271 None,
1272 mz_sql::catalog::ObjectType::NetworkPolicy,
1273 )
1274 .map(|item| item.mz_acl_item(owner_id));
1275 let privileges: Vec<_> =
1276 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1277 .collect();
1278
1279 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1280 let id = tx.insert_user_network_policy(
1281 name.clone(),
1282 rules,
1283 privileges,
1284 owner_id,
1285 &temporary_oids,
1286 )?;
1287
1288 CatalogState::add_to_audit_log(
1289 &state.system_configuration,
1290 oracle_write_ts,
1291 session,
1292 tx,
1293 audit_events,
1294 EventType::Create,
1295 ObjectType::NetworkPolicy,
1296 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1297 id: id.to_string(),
1298 name: name.clone(),
1299 }),
1300 )?;
1301
1302 info!("created network policy {name} ({id})");
1303 }
1304 Op::Comment {
1305 object_id,
1306 sub_component,
1307 comment,
1308 } => {
1309 tx.update_comment(object_id, sub_component, comment)?;
1310 let entry = state.get_comment_id_entry(&object_id);
1311 let should_log = entry
1312 .map(|entry| Self::should_audit_log_item(entry.item()))
1313 .unwrap_or(true);
1315 if let (Some(conn_id), true) =
1318 (session.map(|session| session.conn_id()), should_log)
1319 {
1320 CatalogState::add_to_audit_log(
1321 &state.system_configuration,
1322 oracle_write_ts,
1323 session,
1324 tx,
1325 audit_events,
1326 EventType::Comment,
1327 comment_id_to_audit_object_type(object_id),
1328 EventDetails::IdNameV1(IdNameV1 {
1329 id: format!("{object_id:?}"),
1331 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1332 }),
1333 )?;
1334 }
1335 }
1336 Op::UpdateSourceReferences {
1337 source_id,
1338 references,
1339 } => {
1340 tx.update_source_references(
1341 source_id,
1342 references
1343 .references
1344 .into_iter()
1345 .map(|reference| reference.into())
1346 .collect(),
1347 references.updated_at,
1348 )?;
1349 }
1350 Op::DropObjects(drop_object_infos) => {
1351 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1353
1354 tx.drop_comments(&delta.comments)?;
1356
1357 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1359 delta
1360 .items
1361 .iter()
1362 .map(|id| id)
1363 .partition(|id| !state.get_entry(*id).item().is_temporary());
1364 tx.remove_items(&durable_items_to_drop)?;
1365 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1366 let entry = state.get_entry(&id);
1367 (entry.clone().into(), StateDiff::Retraction)
1368 }));
1369
1370 for item_id in delta.items {
1371 let entry = state.get_entry(&item_id);
1372
1373 if entry.item().is_storage_collection() {
1374 storage_collections_to_drop.extend(entry.global_ids());
1375 }
1376
1377 if state.source_references.contains_key(&item_id) {
1378 tx.remove_source_references(item_id)?;
1379 }
1380
1381 if Self::should_audit_log_item(entry.item()) {
1382 CatalogState::add_to_audit_log(
1383 &state.system_configuration,
1384 oracle_write_ts,
1385 session,
1386 tx,
1387 audit_events,
1388 EventType::Drop,
1389 catalog_type_to_audit_object_type(entry.item().typ()),
1390 EventDetails::IdFullNameV1(IdFullNameV1 {
1391 id: item_id.to_string(),
1392 name: Self::full_name_detail(&state.resolve_full_name(
1393 entry.name(),
1394 session.map(|session| session.conn_id()),
1395 )),
1396 }),
1397 )?;
1398 }
1399 info!(
1400 "drop {} {} ({})",
1401 entry.item_type(),
1402 state.resolve_full_name(entry.name(), entry.conn_id()),
1403 item_id
1404 );
1405 }
1406
1407 let schemas = delta
1409 .schemas
1410 .iter()
1411 .map(|(schema_spec, database_spec)| {
1412 (SchemaId::from(schema_spec), *database_spec)
1413 })
1414 .collect();
1415 tx.remove_schemas(&schemas)?;
1416
1417 for (schema_spec, database_spec) in delta.schemas {
1418 let schema = state.get_schema(
1419 &database_spec,
1420 &schema_spec,
1421 session
1422 .map(|session| session.conn_id())
1423 .unwrap_or(&SYSTEM_CONN_ID),
1424 );
1425
1426 let schema_id = SchemaId::from(schema_spec);
1427 let database_id = match database_spec {
1428 ResolvedDatabaseSpecifier::Ambient => None,
1429 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1430 };
1431
1432 CatalogState::add_to_audit_log(
1433 &state.system_configuration,
1434 oracle_write_ts,
1435 session,
1436 tx,
1437 audit_events,
1438 EventType::Drop,
1439 ObjectType::Schema,
1440 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1441 id: schema_id.to_string(),
1442 name: schema.name.schema.to_string(),
1443 database_name: database_id
1444 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1445 }),
1446 )?;
1447 }
1448
1449 tx.remove_databases(&delta.databases)?;
1451
1452 for database_id in delta.databases {
1453 let database = state.get_database(&database_id).clone();
1454
1455 CatalogState::add_to_audit_log(
1456 &state.system_configuration,
1457 oracle_write_ts,
1458 session,
1459 tx,
1460 audit_events,
1461 EventType::Drop,
1462 ObjectType::Database,
1463 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1464 id: database_id.to_string(),
1465 name: database.name.clone(),
1466 }),
1467 )?;
1468 }
1469
1470 tx.remove_user_roles(&delta.roles)?;
1472
1473 for role_id in delta.roles {
1474 let role = state
1475 .roles_by_id
1476 .get(&role_id)
1477 .expect("catalog out of sync");
1478
1479 CatalogState::add_to_audit_log(
1480 &state.system_configuration,
1481 oracle_write_ts,
1482 session,
1483 tx,
1484 audit_events,
1485 EventType::Drop,
1486 ObjectType::Role,
1487 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1488 id: role.id.to_string(),
1489 name: role.name.clone(),
1490 }),
1491 )?;
1492 info!("drop role {}", role.name());
1493 }
1494
1495 tx.remove_network_policies(&delta.network_policies)?;
1497
1498 for network_policy_id in delta.network_policies {
1499 let policy = state
1500 .network_policies_by_id
1501 .get(&network_policy_id)
1502 .expect("catalog out of sync");
1503
1504 CatalogState::add_to_audit_log(
1505 &state.system_configuration,
1506 oracle_write_ts,
1507 session,
1508 tx,
1509 audit_events,
1510 EventType::Drop,
1511 ObjectType::NetworkPolicy,
1512 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1513 id: policy.id.to_string(),
1514 name: policy.name.clone(),
1515 }),
1516 )?;
1517 info!("drop network policy {}", policy.name.clone());
1518 }
1519
1520 let replicas = delta.replicas.keys().copied().collect();
1522 tx.remove_cluster_replicas(&replicas)?;
1523
1524 for (replica_id, (cluster_id, reason)) in delta.replicas {
1525 let cluster = state.get_cluster(cluster_id);
1526 let replica = cluster.replica(replica_id).expect("Must exist");
1527
1528 let (reason, scheduling_policies) = reason.into_audit_log();
1529 let details =
1530 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1531 cluster_id: cluster_id.to_string(),
1532 cluster_name: cluster.name.clone(),
1533 replica_id: Some(replica_id.to_string()),
1534 replica_name: replica.name.clone(),
1535 reason,
1536 scheduling_policies,
1537 });
1538 CatalogState::add_to_audit_log(
1539 &state.system_configuration,
1540 oracle_write_ts,
1541 session,
1542 tx,
1543 audit_events,
1544 EventType::Drop,
1545 ObjectType::ClusterReplica,
1546 details,
1547 )?;
1548 }
1549
1550 tx.remove_clusters(&delta.clusters)?;
1552
1553 for cluster_id in delta.clusters {
1554 let cluster = state.get_cluster(cluster_id);
1555
1556 CatalogState::add_to_audit_log(
1557 &state.system_configuration,
1558 oracle_write_ts,
1559 session,
1560 tx,
1561 audit_events,
1562 EventType::Drop,
1563 ObjectType::Cluster,
1564 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1565 id: cluster.id.to_string(),
1566 name: cluster.name.clone(),
1567 }),
1568 )?;
1569 }
1570 }
1571 Op::GrantRole {
1572 role_id,
1573 member_id,
1574 grantor_id,
1575 } => {
1576 state.ensure_not_reserved_role(&member_id)?;
1577 state.ensure_grantable_role(&role_id)?;
1578 if state.collect_role_membership(&role_id).contains(&member_id) {
1579 let group_role = state.get_role(&role_id);
1580 let member_role = state.get_role(&member_id);
1581 return Err(AdapterError::Catalog(Error::new(
1582 ErrorKind::CircularRoleMembership {
1583 role_name: group_role.name().to_string(),
1584 member_name: member_role.name().to_string(),
1585 },
1586 )));
1587 }
1588 let mut member_role = state.get_role(&member_id).clone();
1589 member_role.membership.map.insert(role_id, grantor_id);
1590 tx.update_role(member_id, member_role.into())?;
1591
1592 CatalogState::add_to_audit_log(
1593 &state.system_configuration,
1594 oracle_write_ts,
1595 session,
1596 tx,
1597 audit_events,
1598 EventType::Grant,
1599 ObjectType::Role,
1600 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1601 role_id: role_id.to_string(),
1602 member_id: member_id.to_string(),
1603 grantor_id: grantor_id.to_string(),
1604 executed_by: session
1605 .map(|session| session.authenticated_role_id())
1606 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1607 .to_string(),
1608 }),
1609 )?;
1610 }
1611 Op::RevokeRole {
1612 role_id,
1613 member_id,
1614 grantor_id,
1615 } => {
1616 state.ensure_not_reserved_role(&member_id)?;
1617 state.ensure_grantable_role(&role_id)?;
1618 let mut member_role = state.get_role(&member_id).clone();
1619 member_role.membership.map.remove(&role_id);
1620 tx.update_role(member_id, member_role.into())?;
1621
1622 CatalogState::add_to_audit_log(
1623 &state.system_configuration,
1624 oracle_write_ts,
1625 session,
1626 tx,
1627 audit_events,
1628 EventType::Revoke,
1629 ObjectType::Role,
1630 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1631 role_id: role_id.to_string(),
1632 member_id: member_id.to_string(),
1633 grantor_id: grantor_id.to_string(),
1634 executed_by: session
1635 .map(|session| session.authenticated_role_id())
1636 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1637 .to_string(),
1638 }),
1639 )?;
1640 }
1641 Op::UpdatePrivilege {
1642 target_id,
1643 privilege,
1644 variant,
1645 } => {
1646 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1647 UpdatePrivilegeVariant::Grant => {
1648 privileges.grant(privilege);
1649 }
1650 UpdatePrivilegeVariant::Revoke => {
1651 privileges.revoke(&privilege);
1652 }
1653 };
1654 match &target_id {
1655 SystemObjectId::Object(object_id) => match object_id {
1656 ObjectId::Cluster(id) => {
1657 let mut cluster = state.get_cluster(*id).clone();
1658 update_privilege_fn(&mut cluster.privileges);
1659 tx.update_cluster(*id, cluster.into())?;
1660 }
1661 ObjectId::Database(id) => {
1662 let mut database = state.get_database(id).clone();
1663 update_privilege_fn(&mut database.privileges);
1664 tx.update_database(*id, database.into())?;
1665 }
1666 ObjectId::NetworkPolicy(id) => {
1667 let mut policy = state.get_network_policy(id).clone();
1668 update_privilege_fn(&mut policy.privileges);
1669 tx.update_network_policy(*id, policy.into())?;
1670 }
1671 ObjectId::Schema((database_spec, schema_spec)) => {
1672 let schema_id = schema_spec.clone().into();
1673 let mut schema = state
1674 .get_schema(
1675 database_spec,
1676 schema_spec,
1677 session
1678 .map(|session| session.conn_id())
1679 .unwrap_or(&SYSTEM_CONN_ID),
1680 )
1681 .clone();
1682 update_privilege_fn(&mut schema.privileges);
1683 tx.update_schema(schema_id, schema.into())?;
1684 }
1685 ObjectId::Item(id) => {
1686 let entry = state.get_entry(id);
1687 let mut new_entry = entry.clone();
1688 update_privilege_fn(&mut new_entry.privileges);
1689 if !new_entry.item().is_temporary() {
1690 tx.update_item(*id, new_entry.into())?;
1691 } else {
1692 temporary_item_updates
1693 .push((entry.clone().into(), StateDiff::Retraction));
1694 temporary_item_updates
1695 .push((new_entry.into(), StateDiff::Addition));
1696 }
1697 }
1698 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1699 },
1700 SystemObjectId::System => {
1701 let mut system_privileges = state.system_privileges.clone();
1702 update_privilege_fn(&mut system_privileges);
1703 let new_privilege =
1704 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1705 tx.set_system_privilege(
1706 privilege.grantee,
1707 privilege.grantor,
1708 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1709 )?;
1710 }
1711 }
1712 let object_type = state.get_system_object_type(&target_id);
1713 let object_id_str = match &target_id {
1714 SystemObjectId::System => "SYSTEM".to_string(),
1715 SystemObjectId::Object(id) => id.to_string(),
1716 };
1717 CatalogState::add_to_audit_log(
1718 &state.system_configuration,
1719 oracle_write_ts,
1720 session,
1721 tx,
1722 audit_events,
1723 variant.into(),
1724 system_object_type_to_audit_object_type(&object_type),
1725 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1726 object_id: object_id_str,
1727 grantee_id: privilege.grantee.to_string(),
1728 grantor_id: privilege.grantor.to_string(),
1729 privileges: privilege.acl_mode.to_string(),
1730 }),
1731 )?;
1732 }
1733 Op::UpdateDefaultPrivilege {
1734 privilege_object,
1735 privilege_acl_item,
1736 variant,
1737 } => {
1738 let mut default_privileges = state.default_privileges.clone();
1739 match variant {
1740 UpdatePrivilegeVariant::Grant => default_privileges
1741 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1742 UpdatePrivilegeVariant::Revoke => {
1743 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1744 }
1745 }
1746 let new_acl_mode = default_privileges
1747 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1748 tx.set_default_privilege(
1749 privilege_object.role_id,
1750 privilege_object.database_id,
1751 privilege_object.schema_id,
1752 privilege_object.object_type,
1753 privilege_acl_item.grantee,
1754 new_acl_mode.cloned(),
1755 )?;
1756 CatalogState::add_to_audit_log(
1757 &state.system_configuration,
1758 oracle_write_ts,
1759 session,
1760 tx,
1761 audit_events,
1762 variant.into(),
1763 object_type_to_audit_object_type(privilege_object.object_type),
1764 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1765 role_id: privilege_object.role_id.to_string(),
1766 database_id: privilege_object.database_id.map(|id| id.to_string()),
1767 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1768 grantee_id: privilege_acl_item.grantee.to_string(),
1769 privileges: privilege_acl_item.acl_mode.to_string(),
1770 }),
1771 )?;
1772 }
1773 Op::RenameCluster {
1774 id,
1775 name,
1776 to_name,
1777 check_reserved_names,
1778 } => {
1779 if id.is_system() {
1780 return Err(AdapterError::Catalog(Error::new(
1781 ErrorKind::ReadOnlyCluster(name.clone()),
1782 )));
1783 }
1784 if check_reserved_names && is_reserved_name(&to_name) {
1785 return Err(AdapterError::Catalog(Error::new(
1786 ErrorKind::ReservedClusterName(to_name),
1787 )));
1788 }
1789 tx.rename_cluster(id, &name, &to_name)?;
1790 CatalogState::add_to_audit_log(
1791 &state.system_configuration,
1792 oracle_write_ts,
1793 session,
1794 tx,
1795 audit_events,
1796 EventType::Alter,
1797 ObjectType::Cluster,
1798 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1799 id: id.to_string(),
1800 old_name: name.clone(),
1801 new_name: to_name.clone(),
1802 }),
1803 )?;
1804 info!("rename cluster {name} to {to_name}");
1805 }
1806 Op::RenameClusterReplica {
1807 cluster_id,
1808 replica_id,
1809 name,
1810 to_name,
1811 } => {
1812 if is_reserved_name(&to_name) {
1813 return Err(AdapterError::Catalog(Error::new(
1814 ErrorKind::ReservedReplicaName(to_name),
1815 )));
1816 }
1817 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1818 CatalogState::add_to_audit_log(
1819 &state.system_configuration,
1820 oracle_write_ts,
1821 session,
1822 tx,
1823 audit_events,
1824 EventType::Alter,
1825 ObjectType::ClusterReplica,
1826 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1827 cluster_id: cluster_id.to_string(),
1828 replica_id: replica_id.to_string(),
1829 old_name: name.replica.as_str().to_string(),
1830 new_name: to_name.clone(),
1831 }),
1832 )?;
1833 info!("rename cluster replica {name} to {to_name}");
1834 }
1835 Op::RenameItem {
1836 id,
1837 to_name,
1838 current_full_name,
1839 } => {
1840 let mut updates = Vec::new();
1841
1842 let entry = state.get_entry(&id);
1843 if let CatalogItem::Type(_) = entry.item() {
1844 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1845 current_full_name.to_string(),
1846 ))));
1847 }
1848
1849 if entry.id().is_system() {
1850 let name = state
1851 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1852 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1853 name.to_string(),
1854 ))));
1855 }
1856
1857 let mut to_full_name = current_full_name.clone();
1858 to_full_name.item.clone_from(&to_name);
1859
1860 let mut to_qualified_name = entry.name().clone();
1861 to_qualified_name.item.clone_from(&to_name);
1862
1863 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1864 id: id.to_string(),
1865 old_name: Self::full_name_detail(¤t_full_name),
1866 new_name: Self::full_name_detail(&to_full_name),
1867 });
1868 if Self::should_audit_log_item(entry.item()) {
1869 CatalogState::add_to_audit_log(
1870 &state.system_configuration,
1871 oracle_write_ts,
1872 session,
1873 tx,
1874 audit_events,
1875 EventType::Alter,
1876 catalog_type_to_audit_object_type(entry.item().typ()),
1877 details,
1878 )?;
1879 }
1880
1881 let mut new_entry = entry.clone();
1883 new_entry.name.item.clone_from(&to_name);
1884 new_entry.item = entry
1885 .item()
1886 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1887 .map_err(|e| {
1888 Error::new(ErrorKind::from(AmbiguousRename {
1889 depender: state
1890 .resolve_full_name(entry.name(), entry.conn_id())
1891 .to_string(),
1892 dependee: state
1893 .resolve_full_name(entry.name(), entry.conn_id())
1894 .to_string(),
1895 message: e,
1896 }))
1897 })?;
1898
1899 for id in entry.referenced_by() {
1900 let dependent_item = state.get_entry(id);
1901 let mut to_entry = dependent_item.clone();
1902 to_entry.item = dependent_item
1903 .item()
1904 .rename_item_refs(
1905 current_full_name.clone(),
1906 to_full_name.item.clone(),
1907 false,
1908 )
1909 .map_err(|e| {
1910 Error::new(ErrorKind::from(AmbiguousRename {
1911 depender: state
1912 .resolve_full_name(
1913 dependent_item.name(),
1914 dependent_item.conn_id(),
1915 )
1916 .to_string(),
1917 dependee: state
1918 .resolve_full_name(entry.name(), entry.conn_id())
1919 .to_string(),
1920 message: e,
1921 }))
1922 })?;
1923
1924 if !to_entry.item().is_temporary() {
1925 tx.update_item(*id, to_entry.into())?;
1926 } else {
1927 temporary_item_updates
1928 .push((dependent_item.clone().into(), StateDiff::Retraction));
1929 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1930 }
1931 updates.push(*id);
1932 }
1933 if !new_entry.item().is_temporary() {
1934 tx.update_item(id, new_entry.into())?;
1935 } else {
1936 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1937 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1938 }
1939
1940 updates.push(id);
1941 for id in updates {
1942 Self::log_update(state, &id);
1943 }
1944 }
1945 Op::RenameSchema {
1946 database_spec,
1947 schema_spec,
1948 new_name,
1949 check_reserved_names,
1950 } => {
1951 if check_reserved_names && is_reserved_name(&new_name) {
1952 return Err(AdapterError::Catalog(Error::new(
1953 ErrorKind::ReservedSchemaName(new_name),
1954 )));
1955 }
1956
1957 let conn_id = session
1958 .map(|session| session.conn_id())
1959 .unwrap_or(&SYSTEM_CONN_ID);
1960
1961 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1962 let cur_name = schema.name().schema.clone();
1963
1964 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1965 return Err(AdapterError::Catalog(Error::new(
1966 ErrorKind::AmbientSchemaRename(cur_name),
1967 )));
1968 };
1969 let database = state.get_database(&database_id);
1970 let database_name = &database.name;
1971
1972 let mut updates = Vec::new();
1973 let mut items_to_update = BTreeMap::new();
1974
1975 let mut update_item = |id| {
1976 if items_to_update.contains_key(id) {
1977 return Ok(());
1978 }
1979
1980 let entry = state.get_entry(id);
1981
1982 let mut new_entry = entry.clone();
1984 new_entry.item = entry
1985 .item
1986 .rename_schema_refs(database_name, &cur_name, &new_name)
1987 .map_err(|(s, _i)| {
1988 Error::new(ErrorKind::from(AmbiguousRename {
1989 depender: state
1990 .resolve_full_name(entry.name(), entry.conn_id())
1991 .to_string(),
1992 dependee: format!("{database_name}.{cur_name}"),
1993 message: format!("ambiguous reference to schema named {s}"),
1994 }))
1995 })?;
1996
1997 if !new_entry.item().is_temporary() {
1999 items_to_update.insert(*id, new_entry.into());
2000 } else {
2001 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2002 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2003 }
2004 updates.push(id);
2005
2006 Ok::<_, AdapterError>(())
2007 };
2008
2009 for (_name, item_id) in &schema.items {
2011 update_item(item_id)?;
2013
2014 for id in state.get_entry(item_id).referenced_by() {
2016 update_item(id)?;
2017 }
2018 }
2019 tx.update_items(items_to_update)?;
2022
2023 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2025 let schema_name = schema.name().schema.clone();
2026 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2027 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2028 )));
2029 };
2030
2031 let database_name = database_spec
2033 .id()
2034 .map(|id| state.get_database(&id).name.clone());
2035 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2036 id: schema_id.to_string(),
2037 old_name: schema.name().schema.clone(),
2038 new_name: new_name.clone(),
2039 database_name,
2040 });
2041 CatalogState::add_to_audit_log(
2042 &state.system_configuration,
2043 oracle_write_ts,
2044 session,
2045 tx,
2046 audit_events,
2047 EventType::Alter,
2048 mz_audit_log::ObjectType::Schema,
2049 details,
2050 )?;
2051
2052 let mut new_schema = schema.clone();
2054 new_schema.name.schema.clone_from(&new_name);
2055 tx.update_schema(schema_id, new_schema.into())?;
2056
2057 for id in updates {
2058 Self::log_update(state, id);
2059 }
2060 }
2061 Op::UpdateOwner { id, new_owner } => {
2062 let conn_id = session
2063 .map(|session| session.conn_id())
2064 .unwrap_or(&SYSTEM_CONN_ID);
2065 let old_owner = state
2066 .get_owner_id(&id, conn_id)
2067 .expect("cannot update the owner of an object without an owner");
2068 match &id {
2069 ObjectId::Cluster(id) => {
2070 let mut cluster = state.get_cluster(*id).clone();
2071 if id.is_system() {
2072 return Err(AdapterError::Catalog(Error::new(
2073 ErrorKind::ReadOnlyCluster(cluster.name),
2074 )));
2075 }
2076 Self::update_privilege_owners(
2077 &mut cluster.privileges,
2078 cluster.owner_id,
2079 new_owner,
2080 );
2081 cluster.owner_id = new_owner;
2082 tx.update_cluster(*id, cluster.into())?;
2083 }
2084 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2085 let cluster = state.get_cluster(*cluster_id);
2086 let mut replica = cluster
2087 .replica(*replica_id)
2088 .expect("catalog out of sync")
2089 .clone();
2090 if replica_id.is_system() {
2091 return Err(AdapterError::Catalog(Error::new(
2092 ErrorKind::ReadOnlyClusterReplica(replica.name),
2093 )));
2094 }
2095 replica.owner_id = new_owner;
2096 tx.update_cluster_replica(*replica_id, replica.into())?;
2097 }
2098 ObjectId::Database(id) => {
2099 let mut database = state.get_database(id).clone();
2100 if id.is_system() {
2101 return Err(AdapterError::Catalog(Error::new(
2102 ErrorKind::ReadOnlyDatabase(database.name),
2103 )));
2104 }
2105 Self::update_privilege_owners(
2106 &mut database.privileges,
2107 database.owner_id,
2108 new_owner,
2109 );
2110 database.owner_id = new_owner;
2111 tx.update_database(*id, database.clone().into())?;
2112 }
2113 ObjectId::Schema((database_spec, schema_spec)) => {
2114 let schema_id: SchemaId = schema_spec.clone().into();
2115 let mut schema = state
2116 .get_schema(database_spec, schema_spec, conn_id)
2117 .clone();
2118 if schema_id.is_system() {
2119 let name = schema.name();
2120 let full_name = state.resolve_full_schema_name(name);
2121 return Err(AdapterError::Catalog(Error::new(
2122 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2123 )));
2124 }
2125 Self::update_privilege_owners(
2126 &mut schema.privileges,
2127 schema.owner_id,
2128 new_owner,
2129 );
2130 schema.owner_id = new_owner;
2131 tx.update_schema(schema_id, schema.into())?;
2132 }
2133 ObjectId::Item(id) => {
2134 let entry = state.get_entry(id);
2135 let mut new_entry = entry.clone();
2136 if id.is_system() {
2137 let full_name = state.resolve_full_name(
2138 new_entry.name(),
2139 session.map(|session| session.conn_id()),
2140 );
2141 return Err(AdapterError::Catalog(Error::new(
2142 ErrorKind::ReadOnlyItem(full_name.to_string()),
2143 )));
2144 }
2145 Self::update_privilege_owners(
2146 &mut new_entry.privileges,
2147 new_entry.owner_id,
2148 new_owner,
2149 );
2150 new_entry.owner_id = new_owner;
2151 if !new_entry.item().is_temporary() {
2152 tx.update_item(*id, new_entry.into())?;
2153 } else {
2154 temporary_item_updates
2155 .push((entry.clone().into(), StateDiff::Retraction));
2156 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2157 }
2158 }
2159 ObjectId::NetworkPolicy(id) => {
2160 let mut policy = state.get_network_policy(id).clone();
2161 if id.is_system() {
2162 return Err(AdapterError::Catalog(Error::new(
2163 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2164 )));
2165 }
2166 Self::update_privilege_owners(
2167 &mut policy.privileges,
2168 policy.owner_id,
2169 new_owner,
2170 );
2171 policy.owner_id = new_owner;
2172 tx.update_network_policy(*id, policy.into())?;
2173 }
2174 ObjectId::Role(_) => unreachable!("roles have no owner"),
2175 }
2176 let object_type = state.get_object_type(&id);
2177 CatalogState::add_to_audit_log(
2178 &state.system_configuration,
2179 oracle_write_ts,
2180 session,
2181 tx,
2182 audit_events,
2183 EventType::Alter,
2184 object_type_to_audit_object_type(object_type),
2185 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2186 object_id: id.to_string(),
2187 old_owner_id: old_owner.to_string(),
2188 new_owner_id: new_owner.to_string(),
2189 }),
2190 )?;
2191 }
2192 Op::UpdateClusterConfig { id, name, config } => {
2193 let mut cluster = state.get_cluster(id).clone();
2194 cluster.config = config;
2195 tx.update_cluster(id, cluster.into())?;
2196 info!("update cluster {}", name);
2197
2198 CatalogState::add_to_audit_log(
2199 &state.system_configuration,
2200 oracle_write_ts,
2201 session,
2202 tx,
2203 audit_events,
2204 EventType::Alter,
2205 ObjectType::Cluster,
2206 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2207 id: id.to_string(),
2208 name,
2209 }),
2210 )?;
2211 }
2212 Op::UpdateClusterReplicaConfig {
2213 replica_id,
2214 cluster_id,
2215 config,
2216 } => {
2217 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2218 info!("update replica {}", replica.name);
2219 tx.update_cluster_replica(
2220 replica_id,
2221 mz_catalog::durable::ClusterReplica {
2222 cluster_id,
2223 replica_id,
2224 name: replica.name.clone(),
2225 config: config.clone().into(),
2226 owner_id: replica.owner_id,
2227 },
2228 )?;
2229 }
2230 Op::UpdateItem { id, name, to_item } => {
2231 let mut entry = state.get_entry(&id).clone();
2232 entry.name = name.clone();
2233 entry.item = to_item.clone();
2234 tx.update_item(id, entry.into())?;
2235
2236 if Self::should_audit_log_item(&to_item) {
2237 let mut full_name = Self::full_name_detail(
2238 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2239 );
2240 full_name.item = name.item;
2241
2242 CatalogState::add_to_audit_log(
2243 &state.system_configuration,
2244 oracle_write_ts,
2245 session,
2246 tx,
2247 audit_events,
2248 EventType::Alter,
2249 catalog_type_to_audit_object_type(to_item.typ()),
2250 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2251 id: id.to_string(),
2252 name: full_name,
2253 }),
2254 )?;
2255 }
2256
2257 Self::log_update(state, &id);
2258 }
2259 Op::UpdateSystemConfiguration { name, value } => {
2260 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2261 tx.upsert_system_config(&name, parsed_value.clone())?;
2262 if name == ENABLE_0DT_DEPLOYMENT.name() {
2266 let enable_0dt_deployment =
2267 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2268 tx.set_enable_0dt_deployment(enable_0dt_deployment)?;
2269 } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2270 let with_0dt_deployment_max_wait =
2271 Duration::parse(VarInput::Flat(&parsed_value))
2272 .expect("parsing succeeded above");
2273 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2274 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2275 let with_0dt_deployment_ddl_check_interval =
2276 Duration::parse(VarInput::Flat(&parsed_value))
2277 .expect("parsing succeeded above");
2278 tx.set_0dt_deployment_ddl_check_interval(
2279 with_0dt_deployment_ddl_check_interval,
2280 )?;
2281 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2282 let panic_after_timeout =
2283 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2284 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2285 }
2286
2287 CatalogState::add_to_audit_log(
2288 &state.system_configuration,
2289 oracle_write_ts,
2290 session,
2291 tx,
2292 audit_events,
2293 EventType::Alter,
2294 ObjectType::System,
2295 EventDetails::SetV1(mz_audit_log::SetV1 {
2296 name,
2297 value: Some(value.borrow().to_vec().join(", ")),
2298 }),
2299 )?;
2300 }
2301 Op::ResetSystemConfiguration { name } => {
2302 tx.remove_system_config(&name);
2303 if name == ENABLE_0DT_DEPLOYMENT.name() {
2307 tx.reset_enable_0dt_deployment()?;
2308 } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2309 tx.reset_0dt_deployment_max_wait()?;
2310 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2311 tx.reset_0dt_deployment_ddl_check_interval()?;
2312 }
2313
2314 CatalogState::add_to_audit_log(
2315 &state.system_configuration,
2316 oracle_write_ts,
2317 session,
2318 tx,
2319 audit_events,
2320 EventType::Alter,
2321 ObjectType::System,
2322 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2323 )?;
2324 }
2325 Op::ResetAllSystemConfiguration => {
2326 tx.clear_system_configs();
2327 tx.reset_enable_0dt_deployment()?;
2328 tx.reset_0dt_deployment_max_wait()?;
2329 tx.reset_0dt_deployment_ddl_check_interval()?;
2330
2331 CatalogState::add_to_audit_log(
2332 &state.system_configuration,
2333 oracle_write_ts,
2334 session,
2335 tx,
2336 audit_events,
2337 EventType::Alter,
2338 ObjectType::System,
2339 EventDetails::ResetAllV1,
2340 )?;
2341 }
2342 Op::WeirdStorageUsageUpdates {
2343 object_id,
2344 size_bytes,
2345 collection_timestamp,
2346 } => {
2347 let id = tx.allocate_storage_usage_ids()?;
2348 let metric =
2349 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2350 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2351 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2352 weird_builtin_table_update = Some(builtin_table_update);
2353 }
2354 };
2355 Ok((weird_builtin_table_update, temporary_item_updates))
2356 }
2357
2358 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2359 let entry = state.get_entry(id);
2360 info!(
2361 "update {} {} ({})",
2362 entry.item_type(),
2363 state.resolve_full_name(entry.name(), entry.conn_id()),
2364 id
2365 );
2366 }
2367
2368 fn update_privilege_owners(
2372 privileges: &mut PrivilegeMap,
2373 old_owner: RoleId,
2374 new_owner: RoleId,
2375 ) {
2376 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2378
2379 let mut new_present = false;
2380 for privilege in flat_privileges.iter_mut() {
2381 if privilege.grantor == old_owner {
2384 privilege.grantor = new_owner;
2385 } else if privilege.grantor == new_owner {
2386 new_present = true;
2387 }
2388 if privilege.grantee == old_owner {
2390 privilege.grantee = new_owner;
2391 } else if privilege.grantee == new_owner {
2392 new_present = true;
2393 }
2394 }
2395
2396 if new_present {
2400 let privilege_map: BTreeMap<_, Vec<_>> =
2402 flat_privileges
2403 .into_iter()
2404 .fold(BTreeMap::new(), |mut accum, privilege| {
2405 accum
2406 .entry((privilege.grantee, privilege.grantor))
2407 .or_default()
2408 .push(privilege);
2409 accum
2410 });
2411
2412 flat_privileges = privilege_map
2414 .into_iter()
2415 .map(|((grantee, grantor), values)|
2416 values.into_iter().fold(
2418 MzAclItem::empty(grantee, grantor),
2419 |mut accum, mz_aclitem| {
2420 accum.acl_mode =
2421 accum.acl_mode.union(mz_aclitem.acl_mode);
2422 accum
2423 },
2424 ))
2425 .collect();
2426 }
2427
2428 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2429 }
2430}
2431
2432#[derive(Debug, Default)]
2440pub(crate) struct ObjectsToDrop {
2441 pub comments: BTreeSet<CommentObjectId>,
2442 pub databases: BTreeSet<DatabaseId>,
2443 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2444 pub clusters: BTreeSet<ClusterId>,
2445 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2446 pub roles: BTreeSet<RoleId>,
2447 pub items: Vec<CatalogItemId>,
2448 pub network_policies: BTreeSet<NetworkPolicyId>,
2449}
2450
2451impl ObjectsToDrop {
2452 pub fn generate(
2453 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2454 state: &CatalogState,
2455 session: Option<&ConnMeta>,
2456 ) -> Result<Self, AdapterError> {
2457 let mut delta = ObjectsToDrop::default();
2458
2459 for drop_object_info in drop_object_infos {
2460 delta.add_item(drop_object_info, state, session)?;
2461 }
2462
2463 Ok(delta)
2464 }
2465
2466 fn add_item(
2467 &mut self,
2468 drop_object_info: DropObjectInfo,
2469 state: &CatalogState,
2470 session: Option<&ConnMeta>,
2471 ) -> Result<(), AdapterError> {
2472 self.comments
2473 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2474
2475 match drop_object_info {
2476 DropObjectInfo::Database(database_id) => {
2477 let database = &state.database_by_id[&database_id];
2478 if database_id.is_system() {
2479 return Err(AdapterError::Catalog(Error::new(
2480 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2481 )));
2482 }
2483
2484 self.databases.insert(database_id);
2485 }
2486 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2487 let schema = state.get_schema(
2488 &database_spec,
2489 &schema_spec,
2490 session
2491 .map(|session| session.conn_id())
2492 .unwrap_or(&SYSTEM_CONN_ID),
2493 );
2494 let schema_id: SchemaId = schema_spec.into();
2495 if schema_id.is_system() {
2496 let name = schema.name();
2497 let full_name = state.resolve_full_schema_name(name);
2498 return Err(AdapterError::Catalog(Error::new(
2499 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2500 )));
2501 }
2502
2503 self.schemas.insert(schema_spec, database_spec);
2504 }
2505 DropObjectInfo::Role(role_id) => {
2506 let name = state.get_role(&role_id).name().to_string();
2507 if role_id.is_system() || role_id.is_predefined() {
2508 return Err(AdapterError::Catalog(Error::new(
2509 ErrorKind::ReservedRoleName(name.clone()),
2510 )));
2511 }
2512 state.ensure_not_reserved_role(&role_id)?;
2513
2514 self.roles.insert(role_id);
2515 }
2516 DropObjectInfo::Cluster(cluster_id) => {
2517 let cluster = state.get_cluster(cluster_id);
2518 let name = &cluster.name;
2519 if cluster_id.is_system() {
2520 return Err(AdapterError::Catalog(Error::new(
2521 ErrorKind::ReadOnlyCluster(name.clone()),
2522 )));
2523 }
2524
2525 self.clusters.insert(cluster_id);
2526 }
2527 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2528 let cluster = state.get_cluster(cluster_id);
2529 let replica = cluster.replica(replica_id).expect("Must exist");
2530
2531 self.replicas
2532 .insert(replica.replica_id, (cluster.id, reason));
2533 }
2534 DropObjectInfo::Item(item_id) => {
2535 let entry = state.get_entry(&item_id);
2536 if item_id.is_system() {
2537 let name = entry.name();
2538 let full_name =
2539 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2540 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2541 full_name.to_string(),
2542 ))));
2543 }
2544
2545 self.items.push(item_id);
2546 }
2547 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2548 let policy = state.get_network_policy(&network_policy_id);
2549 let name = &policy.name;
2550 if network_policy_id.is_system() {
2551 return Err(AdapterError::Catalog(Error::new(
2552 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2553 )));
2554 }
2555
2556 self.network_policies.insert(network_policy_id);
2557 }
2558 }
2559
2560 Ok(())
2561 }
2562}
2563
2564#[cfg(test)]
2565mod tests {
2566 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2567 use mz_repr::role_id::RoleId;
2568
2569 use crate::catalog::Catalog;
2570
2571 #[mz_ore::test]
2572 fn test_update_privilege_owners() {
2573 let old_owner = RoleId::User(1);
2574 let new_owner = RoleId::User(2);
2575 let other_role = RoleId::User(3);
2576
2577 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2579 MzAclItem {
2580 grantee: other_role,
2581 grantor: old_owner,
2582 acl_mode: AclMode::UPDATE,
2583 },
2584 MzAclItem {
2585 grantee: other_role,
2586 grantor: new_owner,
2587 acl_mode: AclMode::SELECT,
2588 },
2589 ]);
2590 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2591 assert_eq!(1, privileges.all_values().count());
2592 assert_eq!(
2593 vec![MzAclItem {
2594 grantee: other_role,
2595 grantor: new_owner,
2596 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2597 }],
2598 privileges.all_values_owned().collect::<Vec<_>>()
2599 );
2600
2601 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2603 MzAclItem {
2604 grantee: old_owner,
2605 grantor: other_role,
2606 acl_mode: AclMode::UPDATE,
2607 },
2608 MzAclItem {
2609 grantee: new_owner,
2610 grantor: other_role,
2611 acl_mode: AclMode::SELECT,
2612 },
2613 ]);
2614 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2615 assert_eq!(1, privileges.all_values().count());
2616 assert_eq!(
2617 vec![MzAclItem {
2618 grantee: new_owner,
2619 grantor: other_role,
2620 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2621 }],
2622 privileges.all_values_owned().collect::<Vec<_>>()
2623 );
2624
2625 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2627 MzAclItem {
2628 grantee: old_owner,
2629 grantor: old_owner,
2630 acl_mode: AclMode::UPDATE,
2631 },
2632 MzAclItem {
2633 grantee: new_owner,
2634 grantor: new_owner,
2635 acl_mode: AclMode::SELECT,
2636 },
2637 ]);
2638 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2639 assert_eq!(1, privileges.all_values().count());
2640 assert_eq!(
2641 vec![MzAclItem {
2642 grantee: new_owner,
2643 grantor: new_owner,
2644 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2645 }],
2646 privileges.all_values_owned().collect::<Vec<_>>()
2647 );
2648 }
2649}