1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
22 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34 StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, RoleAttributes, RoleMembership,
50 RoleVars,
51};
52use mz_sql::names::{
53 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69 is_reserved_role_name, object_type_to_audit_object_type,
70 system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::cluster_scheduling::SchedulingDecision;
74use crate::util::ResultExt;
75
76#[derive(Debug, Clone)]
77pub enum Op {
78 AlterRetainHistory {
79 id: CatalogItemId,
80 value: Option<Value>,
81 window: CompactionWindow,
82 },
83 AlterRole {
84 id: RoleId,
85 name: String,
86 attributes: RoleAttributes,
87 vars: RoleVars,
88 },
89 AlterNetworkPolicy {
90 id: NetworkPolicyId,
91 rules: Vec<NetworkPolicyRule>,
92 name: String,
93 owner_id: RoleId,
94 },
95 AlterAddColumn {
96 id: CatalogItemId,
97 new_global_id: GlobalId,
98 name: ColumnName,
99 typ: SqlColumnType,
100 sql: RawDataType,
101 },
102 CreateDatabase {
103 name: String,
104 owner_id: RoleId,
105 },
106 CreateSchema {
107 database_id: ResolvedDatabaseSpecifier,
108 schema_name: String,
109 owner_id: RoleId,
110 },
111 CreateRole {
112 name: String,
113 attributes: RoleAttributes,
114 },
115 CreateCluster {
116 id: ClusterId,
117 name: String,
118 introspection_sources: Vec<&'static BuiltinLog>,
119 owner_id: RoleId,
120 config: ClusterConfig,
121 },
122 CreateClusterReplica {
123 cluster_id: ClusterId,
124 name: String,
125 config: ReplicaConfig,
126 owner_id: RoleId,
127 reason: ReplicaCreateDropReason,
128 },
129 CreateItem {
130 id: CatalogItemId,
131 name: QualifiedItemName,
132 item: CatalogItem,
133 owner_id: RoleId,
134 },
135 CreateNetworkPolicy {
136 rules: Vec<NetworkPolicyRule>,
137 name: String,
138 owner_id: RoleId,
139 },
140 Comment {
141 object_id: CommentObjectId,
142 sub_component: Option<usize>,
143 comment: Option<String>,
144 },
145 DropObjects(Vec<DropObjectInfo>),
146 GrantRole {
147 role_id: RoleId,
148 member_id: RoleId,
149 grantor_id: RoleId,
150 },
151 RenameCluster {
152 id: ClusterId,
153 name: String,
154 to_name: String,
155 check_reserved_names: bool,
156 },
157 RenameClusterReplica {
158 cluster_id: ClusterId,
159 replica_id: ReplicaId,
160 name: QualifiedReplica,
161 to_name: String,
162 },
163 RenameItem {
164 id: CatalogItemId,
165 current_full_name: FullItemName,
166 to_name: String,
167 },
168 RenameSchema {
169 database_spec: ResolvedDatabaseSpecifier,
170 schema_spec: SchemaSpecifier,
171 new_name: String,
172 check_reserved_names: bool,
173 },
174 UpdateOwner {
175 id: ObjectId,
176 new_owner: RoleId,
177 },
178 UpdatePrivilege {
179 target_id: SystemObjectId,
180 privilege: MzAclItem,
181 variant: UpdatePrivilegeVariant,
182 },
183 UpdateDefaultPrivilege {
184 privilege_object: DefaultPrivilegeObject,
185 privilege_acl_item: DefaultPrivilegeAclItem,
186 variant: UpdatePrivilegeVariant,
187 },
188 RevokeRole {
189 role_id: RoleId,
190 member_id: RoleId,
191 grantor_id: RoleId,
192 },
193 UpdateClusterConfig {
194 id: ClusterId,
195 name: String,
196 config: ClusterConfig,
197 },
198 UpdateClusterReplicaConfig {
199 cluster_id: ClusterId,
200 replica_id: ReplicaId,
201 config: ReplicaConfig,
202 },
203 UpdateItem {
204 id: CatalogItemId,
205 name: QualifiedItemName,
206 to_item: CatalogItem,
207 },
208 UpdateSourceReferences {
209 source_id: CatalogItemId,
210 references: SourceReferences,
211 },
212 UpdateSystemConfiguration {
213 name: String,
214 value: OwnedVarInput,
215 },
216 ResetSystemConfiguration {
217 name: String,
218 },
219 ResetAllSystemConfiguration,
220 WeirdStorageUsageUpdates {
227 object_id: Option<String>,
228 size_bytes: u64,
229 collection_timestamp: EpochMillis,
230 },
231 TransactionDryRun,
237}
238
239#[derive(Debug, Clone)]
243pub enum DropObjectInfo {
244 Cluster(ClusterId),
245 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
246 Database(DatabaseId),
247 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
248 Role(RoleId),
249 Item(CatalogItemId),
250 NetworkPolicy(NetworkPolicyId),
251}
252
253impl DropObjectInfo {
254 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
257 match id {
258 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
259 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
260 cluster_id,
261 replica_id,
262 ReplicaCreateDropReason::Manual,
263 )),
264 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
265 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
266 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
267 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
268 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
269 }
270 }
271
272 fn to_object_id(&self) -> ObjectId {
275 match &self {
276 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
277 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
278 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
279 }
280 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
281 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
282 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
283 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
284 DropObjectInfo::NetworkPolicy(network_policy_id) => {
285 ObjectId::NetworkPolicy(network_policy_id.clone())
286 }
287 }
288 }
289}
290
291#[derive(Debug, Clone)]
293pub enum ReplicaCreateDropReason {
294 Manual,
299 ClusterScheduling(Vec<SchedulingDecision>),
302}
303
304impl ReplicaCreateDropReason {
305 pub fn into_audit_log(
306 self,
307 ) -> (
308 CreateOrDropClusterReplicaReasonV1,
309 Option<SchedulingDecisionsWithReasonsV2>,
310 ) {
311 let (reason, scheduling_policies) = match self {
312 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
313 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
314 CreateOrDropClusterReplicaReasonV1::Schedule,
315 Some(scheduling_decisions),
316 ),
317 };
318 (
319 reason,
320 scheduling_policies
321 .as_ref()
322 .map(SchedulingDecision::reasons_to_audit_log_reasons),
323 )
324 }
325}
326
327pub struct TransactionResult {
328 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
329 pub audit_events: Vec<VersionedEvent>,
330}
331
332impl Catalog {
333 fn should_audit_log_item(item: &CatalogItem) -> bool {
334 !item.is_temporary()
335 }
336
337 fn temporary_ids(
340 &self,
341 ops: &[Op],
342 temporary_drops: BTreeSet<(&ConnectionId, String)>,
343 ) -> Result<BTreeSet<CatalogItemId>, Error> {
344 let mut creating = BTreeSet::new();
345 let mut temporary_ids = BTreeSet::new();
346 for op in ops.iter() {
347 if let Op::CreateItem {
348 id,
349 name,
350 item,
351 owner_id: _,
352 } = op
353 {
354 if let Some(conn_id) = item.conn_id() {
355 if self.item_exists_in_temp_schemas(conn_id, &name.item)
356 && !temporary_drops.contains(&(conn_id, name.item.clone()))
357 || creating.contains(&(conn_id, &name.item))
358 {
359 return Err(
360 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
361 );
362 } else {
363 creating.insert((conn_id, &name.item));
364 temporary_ids.insert(id.clone());
365 }
366 }
367 }
368 }
369 Ok(temporary_ids)
370 }
371
372 #[instrument(name = "catalog::transact")]
373 pub async fn transact(
374 &mut self,
375 storage_collections: Option<
378 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
379 >,
380 oracle_write_ts: mz_repr::Timestamp,
381 session: Option<&ConnMeta>,
382 ops: Vec<Op>,
383 ) -> Result<TransactionResult, AdapterError> {
384 trace!("transact: {:?}", ops);
385 fail::fail_point!("catalog_transact", |arg| {
386 Err(AdapterError::Unstructured(anyhow::anyhow!(
387 "failpoint: {arg:?}"
388 )))
389 });
390
391 let drop_ids: BTreeSet<CatalogItemId> = ops
392 .iter()
393 .filter_map(|op| match op {
394 Op::DropObjects(drop_object_infos) => {
395 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
396 let item_ids = ids.filter_map(|id| match id {
397 ObjectId::Item(id) => Some(id),
398 _ => None,
399 });
400 Some(item_ids)
401 }
402 _ => None,
403 })
404 .flatten()
405 .collect();
406 let temporary_drops = drop_ids
407 .iter()
408 .filter_map(|id| {
409 let entry = self.get_entry(id);
410 match entry.item().conn_id() {
411 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
412 None => None,
413 }
414 })
415 .collect();
416 let dropped_global_ids = drop_ids
417 .iter()
418 .flat_map(|item_id| self.get_global_ids(item_id))
419 .collect();
420
421 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
422 let mut builtin_table_updates = vec![];
423 let mut audit_events = vec![];
424 let mut storage = self.storage().await;
425 let mut tx = storage
426 .transaction()
427 .await
428 .unwrap_or_terminate("starting catalog transaction");
429
430 let new_state = Self::transact_inner(
431 storage_collections,
432 oracle_write_ts,
433 session,
434 ops,
435 temporary_ids,
436 &mut builtin_table_updates,
437 &mut audit_events,
438 &mut tx,
439 &self.state,
440 )
441 .await?;
442
443 tx.commit(oracle_write_ts)
448 .await
449 .unwrap_or_terminate("catalog storage transaction commit must succeed");
450
451 drop(storage);
454 if let Some(new_state) = new_state {
455 self.transient_revision += 1;
456 self.state = new_state;
457 }
458
459 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
461 if self.state.system_config().enable_mz_notices() {
462 self.state().pack_optimizer_notices(
464 &mut builtin_table_updates,
465 dropped_notices.iter(),
466 Diff::MINUS_ONE,
467 );
468 }
469
470 Ok(TransactionResult {
471 builtin_table_updates,
472 audit_events,
473 })
474 }
475
476 #[instrument(name = "catalog::transact_inner")]
484 async fn transact_inner(
485 storage_collections: Option<
486 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
487 >,
488 oracle_write_ts: mz_repr::Timestamp,
489 session: Option<&ConnMeta>,
490 mut ops: Vec<Op>,
491 temporary_ids: BTreeSet<CatalogItemId>,
492 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
493 audit_events: &mut Vec<VersionedEvent>,
494 tx: &mut Transaction<'_>,
495 state: &CatalogState,
496 ) -> Result<Option<CatalogState>, AdapterError> {
497 let mut state = Cow::Borrowed(state);
498
499 let dry_run_ops = match ops.last() {
500 Some(Op::TransactionDryRun) => {
501 ops.pop();
503 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
504 ops.clone()
505 }
506 Some(_) => vec![],
507 None => return Ok(None),
508 };
509
510 let mut storage_collections_to_create = BTreeSet::new();
511 let mut storage_collections_to_drop = BTreeSet::new();
512 let mut storage_collections_to_register = BTreeMap::new();
513
514 for op in ops {
515 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
516 oracle_write_ts,
517 session,
518 op,
519 &temporary_ids,
520 audit_events,
521 tx,
522 &*state,
523 &mut storage_collections_to_create,
524 &mut storage_collections_to_drop,
525 &mut storage_collections_to_register,
526 )
527 .await?;
528
529 if let Some(builtin_table_update) = weird_builtin_table_update {
538 builtin_table_updates.push(builtin_table_update);
539 }
540
541 let op_id = tx.op_id().into();
546 let temporary_item_updates =
547 temporary_item_updates
548 .into_iter()
549 .map(|(item, diff)| StateUpdate {
550 kind: StateUpdateKind::TemporaryItem(item),
551 ts: op_id,
552 diff,
553 });
554
555 let mut updates: Vec<_> = tx.get_and_commit_op_updates();
556 updates.extend(temporary_item_updates);
557 if !updates.is_empty() {
558 let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
559 let op_builtin_table_updates = state
560 .to_mut()
561 .resolve_builtin_table_updates(op_builtin_table_updates);
562 builtin_table_updates.extend(op_builtin_table_updates);
563 }
564 }
565
566 if dry_run_ops.is_empty() {
567 if let Some(c) = storage_collections {
569 c.prepare_state(
570 tx,
571 storage_collections_to_create,
572 storage_collections_to_drop,
573 storage_collections_to_register,
574 )
575 .await?;
576 }
577
578 let updates = tx.get_and_commit_op_updates();
579 if !updates.is_empty() {
580 let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
581 let op_builtin_table_updates = state
582 .to_mut()
583 .resolve_builtin_table_updates(op_builtin_table_updates);
584 builtin_table_updates.extend(op_builtin_table_updates);
585 }
586
587 match state {
588 Cow::Owned(state) => Ok(Some(state)),
589 Cow::Borrowed(_) => Ok(None),
590 }
591 } else {
592 Err(AdapterError::TransactionDryRun {
593 new_ops: dry_run_ops,
594 new_state: state.into_owned(),
595 })
596 }
597 }
598
599 #[instrument]
607 async fn transact_op(
608 oracle_write_ts: mz_repr::Timestamp,
609 session: Option<&ConnMeta>,
610 op: Op,
611 temporary_ids: &BTreeSet<CatalogItemId>,
612 audit_events: &mut Vec<VersionedEvent>,
613 tx: &mut Transaction<'_>,
614 state: &CatalogState,
615 storage_collections_to_create: &mut BTreeSet<GlobalId>,
616 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
617 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
618 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
619 let mut weird_builtin_table_update = None;
620 let mut temporary_item_updates = Vec::new();
621
622 match op {
623 Op::TransactionDryRun => {
624 unreachable!("TransactionDryRun can only be used a final element of ops")
625 }
626 Op::AlterRetainHistory { id, value, window } => {
627 let entry = state.get_entry(&id);
628 if id.is_system() {
629 let name = entry.name();
630 let full_name =
631 state.resolve_full_name(name, session.map(|session| session.conn_id()));
632 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
633 full_name.to_string(),
634 ))));
635 }
636
637 let mut new_entry = entry.clone();
638 let previous = new_entry
639 .item
640 .update_retain_history(value.clone(), window)
641 .map_err(|_| {
642 AdapterError::Catalog(Error::new(ErrorKind::Internal(
643 "planner should have rejected invalid alter retain history item type"
644 .to_string(),
645 )))
646 })?;
647
648 if Self::should_audit_log_item(new_entry.item()) {
649 let details =
650 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
651 id: id.to_string(),
652 old_history: previous.map(|previous| previous.to_string()),
653 new_history: value.map(|v| v.to_string()),
654 });
655 CatalogState::add_to_audit_log(
656 &state.system_configuration,
657 oracle_write_ts,
658 session,
659 tx,
660 audit_events,
661 EventType::Alter,
662 catalog_type_to_audit_object_type(new_entry.item().typ()),
663 details,
664 )?;
665 }
666
667 tx.update_item(id, new_entry.into())?;
668
669 Self::log_update(state, &id);
670 }
671 Op::AlterRole {
672 id,
673 name,
674 attributes,
675 vars,
676 } => {
677 state.ensure_not_reserved_role(&id)?;
678
679 let mut existing_role = state.get_role(&id).clone();
680 existing_role.attributes = attributes;
681 existing_role.vars = vars;
682 tx.update_role(id, existing_role.into())?;
683
684 CatalogState::add_to_audit_log(
685 &state.system_configuration,
686 oracle_write_ts,
687 session,
688 tx,
689 audit_events,
690 EventType::Alter,
691 ObjectType::Role,
692 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
693 id: id.to_string(),
694 name: name.clone(),
695 }),
696 )?;
697
698 info!("update role {name} ({id})");
699 }
700 Op::AlterNetworkPolicy {
701 id,
702 rules,
703 name,
704 owner_id: _owner_id,
705 } => {
706 let existing_policy = state.get_network_policy(&id).clone();
707 let mut policy: NetworkPolicy = existing_policy.into();
708 policy.rules = rules;
709 if is_reserved_name(&name) {
710 return Err(AdapterError::Catalog(Error::new(
711 ErrorKind::ReservedNetworkPolicyName(name),
712 )));
713 }
714 tx.update_network_policy(id, policy.clone())?;
715
716 CatalogState::add_to_audit_log(
717 &state.system_configuration,
718 oracle_write_ts,
719 session,
720 tx,
721 audit_events,
722 EventType::Alter,
723 ObjectType::NetworkPolicy,
724 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
725 id: id.to_string(),
726 name: name.clone(),
727 }),
728 )?;
729
730 info!("update network policy {name} ({id})");
731 }
732 Op::AlterAddColumn {
733 id,
734 new_global_id,
735 name,
736 typ,
737 sql,
738 } => {
739 let mut new_entry = state.get_entry(&id).clone();
740 let version = new_entry.item.add_column(name, typ, sql)?;
741 let shard_id = state
744 .storage_metadata()
745 .get_collection_shard(new_entry.latest_global_id())?;
746
747 let CatalogItem::Table(table) = &mut new_entry.item else {
749 return Err(AdapterError::Unsupported("adding columns to non-Table"));
750 };
751 table.collections.insert(version, new_global_id);
752
753 tx.update_item(id, new_entry.into())?;
754 storage_collections_to_register.insert(new_global_id, shard_id);
755 }
756 Op::CreateDatabase { name, owner_id } => {
757 let database_owner_privileges = vec![rbac::owner_privilege(
758 mz_sql::catalog::ObjectType::Database,
759 owner_id,
760 )];
761 let database_default_privileges = state
762 .default_privileges
763 .get_applicable_privileges(
764 owner_id,
765 None,
766 None,
767 mz_sql::catalog::ObjectType::Database,
768 )
769 .map(|item| item.mz_acl_item(owner_id));
770 let database_privileges: Vec<_> = merge_mz_acl_items(
771 database_owner_privileges
772 .into_iter()
773 .chain(database_default_privileges),
774 )
775 .collect();
776
777 let schema_owner_privileges = vec![rbac::owner_privilege(
778 mz_sql::catalog::ObjectType::Schema,
779 owner_id,
780 )];
781 let schema_default_privileges = state
782 .default_privileges
783 .get_applicable_privileges(
784 owner_id,
785 None,
786 None,
787 mz_sql::catalog::ObjectType::Schema,
788 )
789 .map(|item| item.mz_acl_item(owner_id))
790 .chain(std::iter::once(MzAclItem {
792 grantee: RoleId::Public,
793 grantor: owner_id,
794 acl_mode: AclMode::USAGE,
795 }));
796 let schema_privileges: Vec<_> = merge_mz_acl_items(
797 schema_owner_privileges
798 .into_iter()
799 .chain(schema_default_privileges),
800 )
801 .collect();
802
803 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
804 let (database_id, _) = tx.insert_user_database(
805 &name,
806 owner_id,
807 database_privileges.clone(),
808 &temporary_oids,
809 )?;
810 let (schema_id, _) = tx.insert_user_schema(
811 database_id,
812 DEFAULT_SCHEMA,
813 owner_id,
814 schema_privileges.clone(),
815 &temporary_oids,
816 )?;
817 CatalogState::add_to_audit_log(
818 &state.system_configuration,
819 oracle_write_ts,
820 session,
821 tx,
822 audit_events,
823 EventType::Create,
824 ObjectType::Database,
825 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
826 id: database_id.to_string(),
827 name: name.clone(),
828 }),
829 )?;
830 info!("create database {}", name);
831
832 CatalogState::add_to_audit_log(
833 &state.system_configuration,
834 oracle_write_ts,
835 session,
836 tx,
837 audit_events,
838 EventType::Create,
839 ObjectType::Schema,
840 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
841 id: schema_id.to_string(),
842 name: DEFAULT_SCHEMA.to_string(),
843 database_name: Some(name),
844 }),
845 )?;
846 }
847 Op::CreateSchema {
848 database_id,
849 schema_name,
850 owner_id,
851 } => {
852 if is_reserved_name(&schema_name) {
853 return Err(AdapterError::Catalog(Error::new(
854 ErrorKind::ReservedSchemaName(schema_name),
855 )));
856 }
857 let database_id = match database_id {
858 ResolvedDatabaseSpecifier::Id(id) => id,
859 ResolvedDatabaseSpecifier::Ambient => {
860 return Err(AdapterError::Catalog(Error::new(
861 ErrorKind::ReadOnlySystemSchema(schema_name),
862 )));
863 }
864 };
865 let owner_privileges = vec![rbac::owner_privilege(
866 mz_sql::catalog::ObjectType::Schema,
867 owner_id,
868 )];
869 let default_privileges = state
870 .default_privileges
871 .get_applicable_privileges(
872 owner_id,
873 Some(database_id),
874 None,
875 mz_sql::catalog::ObjectType::Schema,
876 )
877 .map(|item| item.mz_acl_item(owner_id));
878 let privileges: Vec<_> =
879 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
880 .collect();
881 let (schema_id, _) = tx.insert_user_schema(
882 database_id,
883 &schema_name,
884 owner_id,
885 privileges.clone(),
886 &state.get_temporary_oids().collect(),
887 )?;
888 CatalogState::add_to_audit_log(
889 &state.system_configuration,
890 oracle_write_ts,
891 session,
892 tx,
893 audit_events,
894 EventType::Create,
895 ObjectType::Schema,
896 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
897 id: schema_id.to_string(),
898 name: schema_name.clone(),
899 database_name: Some(state.database_by_id[&database_id].name.clone()),
900 }),
901 )?;
902 }
903 Op::CreateRole { name, attributes } => {
904 if is_reserved_role_name(&name) {
905 return Err(AdapterError::Catalog(Error::new(
906 ErrorKind::ReservedRoleName(name),
907 )));
908 }
909 let membership = RoleMembership::new();
910 let vars = RoleVars::default();
911 let (id, _) = tx.insert_user_role(
912 name.clone(),
913 attributes.clone(),
914 membership.clone(),
915 vars.clone(),
916 &state.get_temporary_oids().collect(),
917 )?;
918 CatalogState::add_to_audit_log(
919 &state.system_configuration,
920 oracle_write_ts,
921 session,
922 tx,
923 audit_events,
924 EventType::Create,
925 ObjectType::Role,
926 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
927 id: id.to_string(),
928 name: name.clone(),
929 }),
930 )?;
931 info!("create role {}", name);
932 }
933 Op::CreateCluster {
934 id,
935 name,
936 introspection_sources,
937 owner_id,
938 config,
939 } => {
940 if is_reserved_name(&name) {
941 return Err(AdapterError::Catalog(Error::new(
942 ErrorKind::ReservedClusterName(name),
943 )));
944 }
945 let owner_privileges = vec![rbac::owner_privilege(
946 mz_sql::catalog::ObjectType::Cluster,
947 owner_id,
948 )];
949 let default_privileges = state
950 .default_privileges
951 .get_applicable_privileges(
952 owner_id,
953 None,
954 None,
955 mz_sql::catalog::ObjectType::Cluster,
956 )
957 .map(|item| item.mz_acl_item(owner_id));
958 let privileges: Vec<_> =
959 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
960 .collect();
961 let introspection_source_ids: Vec<_> = introspection_sources
962 .iter()
963 .map(|introspection_source| {
964 Transaction::allocate_introspection_source_index_id(
965 &id,
966 introspection_source.variant,
967 )
968 })
969 .collect();
970
971 let introspection_sources = introspection_sources
972 .into_iter()
973 .zip_eq(introspection_source_ids)
974 .map(|(log, (item_id, gid))| (log, item_id, gid))
975 .collect();
976
977 tx.insert_user_cluster(
978 id,
979 &name,
980 introspection_sources,
981 owner_id,
982 privileges.clone(),
983 config.clone().into(),
984 &state.get_temporary_oids().collect(),
985 )?;
986 CatalogState::add_to_audit_log(
987 &state.system_configuration,
988 oracle_write_ts,
989 session,
990 tx,
991 audit_events,
992 EventType::Create,
993 ObjectType::Cluster,
994 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
995 id: id.to_string(),
996 name: name.clone(),
997 }),
998 )?;
999 info!("create cluster {}", name);
1000 }
1001 Op::CreateClusterReplica {
1002 cluster_id,
1003 name,
1004 config,
1005 owner_id,
1006 reason,
1007 } => {
1008 if is_reserved_name(&name) {
1009 return Err(AdapterError::Catalog(Error::new(
1010 ErrorKind::ReservedReplicaName(name),
1011 )));
1012 }
1013 let cluster = state.get_cluster(cluster_id);
1014 let id =
1015 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1016 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1017 size,
1018 billed_as,
1019 internal,
1020 ..
1021 }) = &config.location
1022 {
1023 let (reason, scheduling_policies) = reason.into_audit_log();
1024 let details = EventDetails::CreateClusterReplicaV4(
1025 mz_audit_log::CreateClusterReplicaV4 {
1026 cluster_id: cluster_id.to_string(),
1027 cluster_name: cluster.name.clone(),
1028 replica_id: Some(id.to_string()),
1029 replica_name: name.clone(),
1030 logical_size: size.clone(),
1031 billed_as: billed_as.clone(),
1032 internal: *internal,
1033 reason,
1034 scheduling_policies,
1035 },
1036 );
1037 CatalogState::add_to_audit_log(
1038 &state.system_configuration,
1039 oracle_write_ts,
1040 session,
1041 tx,
1042 audit_events,
1043 EventType::Create,
1044 ObjectType::ClusterReplica,
1045 details,
1046 )?;
1047 }
1048 }
1049 Op::CreateItem {
1050 id,
1051 name,
1052 item,
1053 owner_id,
1054 } => {
1055 state.check_unstable_dependencies(&item)?;
1056
1057 match &item {
1058 CatalogItem::Table(table) => {
1059 let gids: Vec<_> = table.global_ids().collect();
1060 assert_eq!(gids.len(), 1);
1061 storage_collections_to_create.extend(gids);
1062 }
1063 CatalogItem::Source(source) => {
1064 storage_collections_to_create.insert(source.global_id());
1065 }
1066 CatalogItem::MaterializedView(mv) => {
1067 storage_collections_to_create.insert(mv.global_id());
1068 }
1069 CatalogItem::ContinualTask(ct) => {
1070 storage_collections_to_create.insert(ct.global_id());
1071 }
1072 CatalogItem::Sink(sink) => {
1073 storage_collections_to_create.insert(sink.global_id());
1074 }
1075 CatalogItem::Log(_)
1076 | CatalogItem::View(_)
1077 | CatalogItem::Index(_)
1078 | CatalogItem::Type(_)
1079 | CatalogItem::Func(_)
1080 | CatalogItem::Secret(_)
1081 | CatalogItem::Connection(_) => (),
1082 }
1083
1084 let system_user = session.map_or(false, |s| s.user().is_system_user());
1085 if !system_user {
1086 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1087 let cluster_name = state.clusters_by_id[&id].name.clone();
1088 return Err(AdapterError::Catalog(Error::new(
1089 ErrorKind::ReadOnlyCluster(cluster_name),
1090 )));
1091 }
1092 }
1093
1094 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1095 let default_privileges = state
1096 .default_privileges
1097 .get_applicable_privileges(
1098 owner_id,
1099 name.qualifiers.database_spec.id(),
1100 Some(name.qualifiers.schema_spec.into()),
1101 item.typ().into(),
1102 )
1103 .map(|item| item.mz_acl_item(owner_id));
1104 let progress_source_privilege = if item.is_progress_source() {
1106 Some(MzAclItem {
1107 grantee: MZ_SUPPORT_ROLE_ID,
1108 grantor: owner_id,
1109 acl_mode: AclMode::SELECT,
1110 })
1111 } else {
1112 None
1113 };
1114 let privileges: Vec<_> = merge_mz_acl_items(
1115 owner_privileges
1116 .into_iter()
1117 .chain(default_privileges)
1118 .chain(progress_source_privilege),
1119 )
1120 .collect();
1121
1122 let temporary_oids = state.get_temporary_oids().collect();
1123
1124 if item.is_temporary() {
1125 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1126 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1127 {
1128 return Err(AdapterError::Catalog(Error::new(
1129 ErrorKind::InvalidTemporarySchema,
1130 )));
1131 }
1132 let oid = tx.allocate_oid(&temporary_oids)?;
1133 let item = TemporaryItem {
1134 id,
1135 oid,
1136 name: name.clone(),
1137 item: item.clone(),
1138 owner_id,
1139 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1140 };
1141 temporary_item_updates.push((item, StateDiff::Addition));
1142 } else {
1143 if let Some(temp_id) =
1144 item.uses()
1145 .iter()
1146 .find(|id| match state.try_get_entry(*id) {
1147 Some(entry) => entry.item().is_temporary(),
1148 None => temporary_ids.contains(id),
1149 })
1150 {
1151 let temp_item = state.get_entry(temp_id);
1152 return Err(AdapterError::Catalog(Error::new(
1153 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1154 )));
1155 }
1156 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1157 && !system_user
1158 {
1159 let schema_name = state
1160 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1161 .schema;
1162 return Err(AdapterError::Catalog(Error::new(
1163 ErrorKind::ReadOnlySystemSchema(schema_name),
1164 )));
1165 }
1166 let schema_id = name.qualifiers.schema_spec.clone().into();
1167 let item_type = item.typ();
1168 let (create_sql, global_id, versions) = item.to_serialized();
1169 tx.insert_user_item(
1170 id,
1171 global_id,
1172 schema_id,
1173 &name.item,
1174 create_sql,
1175 owner_id,
1176 privileges.clone(),
1177 &temporary_oids,
1178 versions,
1179 )?;
1180 info!(
1181 "create {} {} ({})",
1182 item_type,
1183 state.resolve_full_name(&name, None),
1184 id
1185 );
1186 }
1187
1188 if Self::should_audit_log_item(&item) {
1189 let name = Self::full_name_detail(
1190 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1191 );
1192 let details = match &item {
1193 CatalogItem::Source(s) => {
1194 let cluster_id = match s.data_source {
1195 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1198 match state.get_entry(&ingestion_id).cluster_id() {
1199 Some(cluster_id) => Some(cluster_id.to_string()),
1200 None => None,
1201 }
1202 }
1203 _ => match item.cluster_id() {
1204 Some(cluster_id) => Some(cluster_id.to_string()),
1205 None => None,
1206 },
1207 };
1208
1209 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1210 id: id.to_string(),
1211 cluster_id,
1212 name,
1213 external_type: s.source_type().to_string(),
1214 })
1215 }
1216 CatalogItem::Sink(s) => {
1217 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1218 id: id.to_string(),
1219 cluster_id: Some(s.cluster_id.to_string()),
1220 name,
1221 external_type: s.sink_type().to_string(),
1222 })
1223 }
1224 CatalogItem::Index(i) => {
1225 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1226 id: id.to_string(),
1227 name,
1228 cluster_id: i.cluster_id.to_string(),
1229 })
1230 }
1231 CatalogItem::MaterializedView(mv) => {
1232 EventDetails::CreateMaterializedViewV1(
1233 mz_audit_log::CreateMaterializedViewV1 {
1234 id: id.to_string(),
1235 name,
1236 cluster_id: mv.cluster_id.to_string(),
1237 },
1238 )
1239 }
1240 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1241 id: id.to_string(),
1242 name,
1243 }),
1244 };
1245 CatalogState::add_to_audit_log(
1246 &state.system_configuration,
1247 oracle_write_ts,
1248 session,
1249 tx,
1250 audit_events,
1251 EventType::Create,
1252 catalog_type_to_audit_object_type(item.typ()),
1253 details,
1254 )?;
1255 }
1256 }
1257 Op::CreateNetworkPolicy {
1258 rules,
1259 name,
1260 owner_id,
1261 } => {
1262 if state.network_policies_by_name.contains_key(&name) {
1263 return Err(AdapterError::PlanError(PlanError::Catalog(
1264 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1265 )));
1266 }
1267 if is_reserved_name(&name) {
1268 return Err(AdapterError::Catalog(Error::new(
1269 ErrorKind::ReservedNetworkPolicyName(name),
1270 )));
1271 }
1272
1273 let owner_privileges = vec![rbac::owner_privilege(
1274 mz_sql::catalog::ObjectType::NetworkPolicy,
1275 owner_id,
1276 )];
1277 let default_privileges = state
1278 .default_privileges
1279 .get_applicable_privileges(
1280 owner_id,
1281 None,
1282 None,
1283 mz_sql::catalog::ObjectType::NetworkPolicy,
1284 )
1285 .map(|item| item.mz_acl_item(owner_id));
1286 let privileges: Vec<_> =
1287 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1288 .collect();
1289
1290 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1291 let id = tx.insert_user_network_policy(
1292 name.clone(),
1293 rules,
1294 privileges,
1295 owner_id,
1296 &temporary_oids,
1297 )?;
1298
1299 CatalogState::add_to_audit_log(
1300 &state.system_configuration,
1301 oracle_write_ts,
1302 session,
1303 tx,
1304 audit_events,
1305 EventType::Create,
1306 ObjectType::NetworkPolicy,
1307 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1308 id: id.to_string(),
1309 name: name.clone(),
1310 }),
1311 )?;
1312
1313 info!("created network policy {name} ({id})");
1314 }
1315 Op::Comment {
1316 object_id,
1317 sub_component,
1318 comment,
1319 } => {
1320 tx.update_comment(object_id, sub_component, comment)?;
1321 let entry = state.get_comment_id_entry(&object_id);
1322 let should_log = entry
1323 .map(|entry| Self::should_audit_log_item(entry.item()))
1324 .unwrap_or(true);
1326 if let (Some(conn_id), true) =
1329 (session.map(|session| session.conn_id()), should_log)
1330 {
1331 CatalogState::add_to_audit_log(
1332 &state.system_configuration,
1333 oracle_write_ts,
1334 session,
1335 tx,
1336 audit_events,
1337 EventType::Comment,
1338 comment_id_to_audit_object_type(object_id),
1339 EventDetails::IdNameV1(IdNameV1 {
1340 id: format!("{object_id:?}"),
1342 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1343 }),
1344 )?;
1345 }
1346 }
1347 Op::UpdateSourceReferences {
1348 source_id,
1349 references,
1350 } => {
1351 tx.update_source_references(
1352 source_id,
1353 references
1354 .references
1355 .into_iter()
1356 .map(|reference| reference.into())
1357 .collect(),
1358 references.updated_at,
1359 )?;
1360 }
1361 Op::DropObjects(drop_object_infos) => {
1362 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1364
1365 tx.drop_comments(&delta.comments)?;
1367
1368 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1370 delta
1371 .items
1372 .iter()
1373 .map(|id| id)
1374 .partition(|id| !state.get_entry(*id).item().is_temporary());
1375 tx.remove_items(&durable_items_to_drop)?;
1376 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1377 let entry = state.get_entry(&id);
1378 (entry.clone().into(), StateDiff::Retraction)
1379 }));
1380
1381 for item_id in delta.items {
1382 let entry = state.get_entry(&item_id);
1383
1384 if entry.item().is_storage_collection() {
1385 storage_collections_to_drop.extend(entry.global_ids());
1386 }
1387
1388 if state.source_references.contains_key(&item_id) {
1389 tx.remove_source_references(item_id)?;
1390 }
1391
1392 if Self::should_audit_log_item(entry.item()) {
1393 CatalogState::add_to_audit_log(
1394 &state.system_configuration,
1395 oracle_write_ts,
1396 session,
1397 tx,
1398 audit_events,
1399 EventType::Drop,
1400 catalog_type_to_audit_object_type(entry.item().typ()),
1401 EventDetails::IdFullNameV1(IdFullNameV1 {
1402 id: item_id.to_string(),
1403 name: Self::full_name_detail(&state.resolve_full_name(
1404 entry.name(),
1405 session.map(|session| session.conn_id()),
1406 )),
1407 }),
1408 )?;
1409 }
1410 info!(
1411 "drop {} {} ({})",
1412 entry.item_type(),
1413 state.resolve_full_name(entry.name(), entry.conn_id()),
1414 item_id
1415 );
1416 }
1417
1418 let schemas = delta
1420 .schemas
1421 .iter()
1422 .map(|(schema_spec, database_spec)| {
1423 (SchemaId::from(schema_spec), *database_spec)
1424 })
1425 .collect();
1426 tx.remove_schemas(&schemas)?;
1427
1428 for (schema_spec, database_spec) in delta.schemas {
1429 let schema = state.get_schema(
1430 &database_spec,
1431 &schema_spec,
1432 session
1433 .map(|session| session.conn_id())
1434 .unwrap_or(&SYSTEM_CONN_ID),
1435 );
1436
1437 let schema_id = SchemaId::from(schema_spec);
1438 let database_id = match database_spec {
1439 ResolvedDatabaseSpecifier::Ambient => None,
1440 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1441 };
1442
1443 CatalogState::add_to_audit_log(
1444 &state.system_configuration,
1445 oracle_write_ts,
1446 session,
1447 tx,
1448 audit_events,
1449 EventType::Drop,
1450 ObjectType::Schema,
1451 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1452 id: schema_id.to_string(),
1453 name: schema.name.schema.to_string(),
1454 database_name: database_id
1455 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1456 }),
1457 )?;
1458 }
1459
1460 tx.remove_databases(&delta.databases)?;
1462
1463 for database_id in delta.databases {
1464 let database = state.get_database(&database_id).clone();
1465
1466 CatalogState::add_to_audit_log(
1467 &state.system_configuration,
1468 oracle_write_ts,
1469 session,
1470 tx,
1471 audit_events,
1472 EventType::Drop,
1473 ObjectType::Database,
1474 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1475 id: database_id.to_string(),
1476 name: database.name.clone(),
1477 }),
1478 )?;
1479 }
1480
1481 tx.remove_user_roles(&delta.roles)?;
1483
1484 for role_id in delta.roles {
1485 let role = state
1486 .roles_by_id
1487 .get(&role_id)
1488 .expect("catalog out of sync");
1489
1490 CatalogState::add_to_audit_log(
1491 &state.system_configuration,
1492 oracle_write_ts,
1493 session,
1494 tx,
1495 audit_events,
1496 EventType::Drop,
1497 ObjectType::Role,
1498 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1499 id: role.id.to_string(),
1500 name: role.name.clone(),
1501 }),
1502 )?;
1503 info!("drop role {}", role.name());
1504 }
1505
1506 tx.remove_network_policies(&delta.network_policies)?;
1508
1509 for network_policy_id in delta.network_policies {
1510 let policy = state
1511 .network_policies_by_id
1512 .get(&network_policy_id)
1513 .expect("catalog out of sync");
1514
1515 CatalogState::add_to_audit_log(
1516 &state.system_configuration,
1517 oracle_write_ts,
1518 session,
1519 tx,
1520 audit_events,
1521 EventType::Drop,
1522 ObjectType::NetworkPolicy,
1523 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1524 id: policy.id.to_string(),
1525 name: policy.name.clone(),
1526 }),
1527 )?;
1528 info!("drop network policy {}", policy.name.clone());
1529 }
1530
1531 let replicas = delta.replicas.keys().copied().collect();
1533 tx.remove_cluster_replicas(&replicas)?;
1534
1535 for (replica_id, (cluster_id, reason)) in delta.replicas {
1536 let cluster = state.get_cluster(cluster_id);
1537 let replica = cluster.replica(replica_id).expect("Must exist");
1538
1539 let (reason, scheduling_policies) = reason.into_audit_log();
1540 let details =
1541 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1542 cluster_id: cluster_id.to_string(),
1543 cluster_name: cluster.name.clone(),
1544 replica_id: Some(replica_id.to_string()),
1545 replica_name: replica.name.clone(),
1546 reason,
1547 scheduling_policies,
1548 });
1549 CatalogState::add_to_audit_log(
1550 &state.system_configuration,
1551 oracle_write_ts,
1552 session,
1553 tx,
1554 audit_events,
1555 EventType::Drop,
1556 ObjectType::ClusterReplica,
1557 details,
1558 )?;
1559 }
1560
1561 tx.remove_clusters(&delta.clusters)?;
1563
1564 for cluster_id in delta.clusters {
1565 let cluster = state.get_cluster(cluster_id);
1566
1567 CatalogState::add_to_audit_log(
1568 &state.system_configuration,
1569 oracle_write_ts,
1570 session,
1571 tx,
1572 audit_events,
1573 EventType::Drop,
1574 ObjectType::Cluster,
1575 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1576 id: cluster.id.to_string(),
1577 name: cluster.name.clone(),
1578 }),
1579 )?;
1580 }
1581 }
1582 Op::GrantRole {
1583 role_id,
1584 member_id,
1585 grantor_id,
1586 } => {
1587 state.ensure_not_reserved_role(&member_id)?;
1588 state.ensure_grantable_role(&role_id)?;
1589 if state.collect_role_membership(&role_id).contains(&member_id) {
1590 let group_role = state.get_role(&role_id);
1591 let member_role = state.get_role(&member_id);
1592 return Err(AdapterError::Catalog(Error::new(
1593 ErrorKind::CircularRoleMembership {
1594 role_name: group_role.name().to_string(),
1595 member_name: member_role.name().to_string(),
1596 },
1597 )));
1598 }
1599 let mut member_role = state.get_role(&member_id).clone();
1600 member_role.membership.map.insert(role_id, grantor_id);
1601 tx.update_role(member_id, member_role.into())?;
1602
1603 CatalogState::add_to_audit_log(
1604 &state.system_configuration,
1605 oracle_write_ts,
1606 session,
1607 tx,
1608 audit_events,
1609 EventType::Grant,
1610 ObjectType::Role,
1611 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1612 role_id: role_id.to_string(),
1613 member_id: member_id.to_string(),
1614 grantor_id: grantor_id.to_string(),
1615 executed_by: session
1616 .map(|session| session.authenticated_role_id())
1617 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1618 .to_string(),
1619 }),
1620 )?;
1621 }
1622 Op::RevokeRole {
1623 role_id,
1624 member_id,
1625 grantor_id,
1626 } => {
1627 state.ensure_not_reserved_role(&member_id)?;
1628 state.ensure_grantable_role(&role_id)?;
1629 let mut member_role = state.get_role(&member_id).clone();
1630 member_role.membership.map.remove(&role_id);
1631 tx.update_role(member_id, member_role.into())?;
1632
1633 CatalogState::add_to_audit_log(
1634 &state.system_configuration,
1635 oracle_write_ts,
1636 session,
1637 tx,
1638 audit_events,
1639 EventType::Revoke,
1640 ObjectType::Role,
1641 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1642 role_id: role_id.to_string(),
1643 member_id: member_id.to_string(),
1644 grantor_id: grantor_id.to_string(),
1645 executed_by: session
1646 .map(|session| session.authenticated_role_id())
1647 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1648 .to_string(),
1649 }),
1650 )?;
1651 }
1652 Op::UpdatePrivilege {
1653 target_id,
1654 privilege,
1655 variant,
1656 } => {
1657 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1658 UpdatePrivilegeVariant::Grant => {
1659 privileges.grant(privilege);
1660 }
1661 UpdatePrivilegeVariant::Revoke => {
1662 privileges.revoke(&privilege);
1663 }
1664 };
1665 match &target_id {
1666 SystemObjectId::Object(object_id) => match object_id {
1667 ObjectId::Cluster(id) => {
1668 let mut cluster = state.get_cluster(*id).clone();
1669 update_privilege_fn(&mut cluster.privileges);
1670 tx.update_cluster(*id, cluster.into())?;
1671 }
1672 ObjectId::Database(id) => {
1673 let mut database = state.get_database(id).clone();
1674 update_privilege_fn(&mut database.privileges);
1675 tx.update_database(*id, database.into())?;
1676 }
1677 ObjectId::NetworkPolicy(id) => {
1678 let mut policy = state.get_network_policy(id).clone();
1679 update_privilege_fn(&mut policy.privileges);
1680 tx.update_network_policy(*id, policy.into())?;
1681 }
1682 ObjectId::Schema((database_spec, schema_spec)) => {
1683 let schema_id = schema_spec.clone().into();
1684 let mut schema = state
1685 .get_schema(
1686 database_spec,
1687 schema_spec,
1688 session
1689 .map(|session| session.conn_id())
1690 .unwrap_or(&SYSTEM_CONN_ID),
1691 )
1692 .clone();
1693 update_privilege_fn(&mut schema.privileges);
1694 tx.update_schema(schema_id, schema.into())?;
1695 }
1696 ObjectId::Item(id) => {
1697 let entry = state.get_entry(id);
1698 let mut new_entry = entry.clone();
1699 update_privilege_fn(&mut new_entry.privileges);
1700 if !new_entry.item().is_temporary() {
1701 tx.update_item(*id, new_entry.into())?;
1702 } else {
1703 temporary_item_updates
1704 .push((entry.clone().into(), StateDiff::Retraction));
1705 temporary_item_updates
1706 .push((new_entry.into(), StateDiff::Addition));
1707 }
1708 }
1709 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1710 },
1711 SystemObjectId::System => {
1712 let mut system_privileges = state.system_privileges.clone();
1713 update_privilege_fn(&mut system_privileges);
1714 let new_privilege =
1715 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1716 tx.set_system_privilege(
1717 privilege.grantee,
1718 privilege.grantor,
1719 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1720 )?;
1721 }
1722 }
1723 let object_type = state.get_system_object_type(&target_id);
1724 let object_id_str = match &target_id {
1725 SystemObjectId::System => "SYSTEM".to_string(),
1726 SystemObjectId::Object(id) => id.to_string(),
1727 };
1728 CatalogState::add_to_audit_log(
1729 &state.system_configuration,
1730 oracle_write_ts,
1731 session,
1732 tx,
1733 audit_events,
1734 variant.into(),
1735 system_object_type_to_audit_object_type(&object_type),
1736 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1737 object_id: object_id_str,
1738 grantee_id: privilege.grantee.to_string(),
1739 grantor_id: privilege.grantor.to_string(),
1740 privileges: privilege.acl_mode.to_string(),
1741 }),
1742 )?;
1743 }
1744 Op::UpdateDefaultPrivilege {
1745 privilege_object,
1746 privilege_acl_item,
1747 variant,
1748 } => {
1749 let mut default_privileges = state.default_privileges.clone();
1750 match variant {
1751 UpdatePrivilegeVariant::Grant => default_privileges
1752 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1753 UpdatePrivilegeVariant::Revoke => {
1754 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1755 }
1756 }
1757 let new_acl_mode = default_privileges
1758 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1759 tx.set_default_privilege(
1760 privilege_object.role_id,
1761 privilege_object.database_id,
1762 privilege_object.schema_id,
1763 privilege_object.object_type,
1764 privilege_acl_item.grantee,
1765 new_acl_mode.cloned(),
1766 )?;
1767 CatalogState::add_to_audit_log(
1768 &state.system_configuration,
1769 oracle_write_ts,
1770 session,
1771 tx,
1772 audit_events,
1773 variant.into(),
1774 object_type_to_audit_object_type(privilege_object.object_type),
1775 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1776 role_id: privilege_object.role_id.to_string(),
1777 database_id: privilege_object.database_id.map(|id| id.to_string()),
1778 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1779 grantee_id: privilege_acl_item.grantee.to_string(),
1780 privileges: privilege_acl_item.acl_mode.to_string(),
1781 }),
1782 )?;
1783 }
1784 Op::RenameCluster {
1785 id,
1786 name,
1787 to_name,
1788 check_reserved_names,
1789 } => {
1790 if id.is_system() {
1791 return Err(AdapterError::Catalog(Error::new(
1792 ErrorKind::ReadOnlyCluster(name.clone()),
1793 )));
1794 }
1795 if check_reserved_names && is_reserved_name(&to_name) {
1796 return Err(AdapterError::Catalog(Error::new(
1797 ErrorKind::ReservedClusterName(to_name),
1798 )));
1799 }
1800 tx.rename_cluster(id, &name, &to_name)?;
1801 CatalogState::add_to_audit_log(
1802 &state.system_configuration,
1803 oracle_write_ts,
1804 session,
1805 tx,
1806 audit_events,
1807 EventType::Alter,
1808 ObjectType::Cluster,
1809 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1810 id: id.to_string(),
1811 old_name: name.clone(),
1812 new_name: to_name.clone(),
1813 }),
1814 )?;
1815 info!("rename cluster {name} to {to_name}");
1816 }
1817 Op::RenameClusterReplica {
1818 cluster_id,
1819 replica_id,
1820 name,
1821 to_name,
1822 } => {
1823 if is_reserved_name(&to_name) {
1824 return Err(AdapterError::Catalog(Error::new(
1825 ErrorKind::ReservedReplicaName(to_name),
1826 )));
1827 }
1828 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1829 CatalogState::add_to_audit_log(
1830 &state.system_configuration,
1831 oracle_write_ts,
1832 session,
1833 tx,
1834 audit_events,
1835 EventType::Alter,
1836 ObjectType::ClusterReplica,
1837 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1838 cluster_id: cluster_id.to_string(),
1839 replica_id: replica_id.to_string(),
1840 old_name: name.replica.as_str().to_string(),
1841 new_name: to_name.clone(),
1842 }),
1843 )?;
1844 info!("rename cluster replica {name} to {to_name}");
1845 }
1846 Op::RenameItem {
1847 id,
1848 to_name,
1849 current_full_name,
1850 } => {
1851 let mut updates = Vec::new();
1852
1853 let entry = state.get_entry(&id);
1854 if let CatalogItem::Type(_) = entry.item() {
1855 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1856 current_full_name.to_string(),
1857 ))));
1858 }
1859
1860 if entry.id().is_system() {
1861 let name = state
1862 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1863 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1864 name.to_string(),
1865 ))));
1866 }
1867
1868 let mut to_full_name = current_full_name.clone();
1869 to_full_name.item.clone_from(&to_name);
1870
1871 let mut to_qualified_name = entry.name().clone();
1872 to_qualified_name.item.clone_from(&to_name);
1873
1874 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1875 id: id.to_string(),
1876 old_name: Self::full_name_detail(¤t_full_name),
1877 new_name: Self::full_name_detail(&to_full_name),
1878 });
1879 if Self::should_audit_log_item(entry.item()) {
1880 CatalogState::add_to_audit_log(
1881 &state.system_configuration,
1882 oracle_write_ts,
1883 session,
1884 tx,
1885 audit_events,
1886 EventType::Alter,
1887 catalog_type_to_audit_object_type(entry.item().typ()),
1888 details,
1889 )?;
1890 }
1891
1892 let mut new_entry = entry.clone();
1894 new_entry.name.item.clone_from(&to_name);
1895 new_entry.item = entry
1896 .item()
1897 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1898 .map_err(|e| {
1899 Error::new(ErrorKind::from(AmbiguousRename {
1900 depender: state
1901 .resolve_full_name(entry.name(), entry.conn_id())
1902 .to_string(),
1903 dependee: state
1904 .resolve_full_name(entry.name(), entry.conn_id())
1905 .to_string(),
1906 message: e,
1907 }))
1908 })?;
1909
1910 for id in entry.referenced_by() {
1911 let dependent_item = state.get_entry(id);
1912 let mut to_entry = dependent_item.clone();
1913 to_entry.item = dependent_item
1914 .item()
1915 .rename_item_refs(
1916 current_full_name.clone(),
1917 to_full_name.item.clone(),
1918 false,
1919 )
1920 .map_err(|e| {
1921 Error::new(ErrorKind::from(AmbiguousRename {
1922 depender: state
1923 .resolve_full_name(
1924 dependent_item.name(),
1925 dependent_item.conn_id(),
1926 )
1927 .to_string(),
1928 dependee: state
1929 .resolve_full_name(entry.name(), entry.conn_id())
1930 .to_string(),
1931 message: e,
1932 }))
1933 })?;
1934
1935 if !to_entry.item().is_temporary() {
1936 tx.update_item(*id, to_entry.into())?;
1937 } else {
1938 temporary_item_updates
1939 .push((dependent_item.clone().into(), StateDiff::Retraction));
1940 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1941 }
1942 updates.push(*id);
1943 }
1944 if !new_entry.item().is_temporary() {
1945 tx.update_item(id, new_entry.into())?;
1946 } else {
1947 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1948 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1949 }
1950
1951 updates.push(id);
1952 for id in updates {
1953 Self::log_update(state, &id);
1954 }
1955 }
1956 Op::RenameSchema {
1957 database_spec,
1958 schema_spec,
1959 new_name,
1960 check_reserved_names,
1961 } => {
1962 if check_reserved_names && is_reserved_name(&new_name) {
1963 return Err(AdapterError::Catalog(Error::new(
1964 ErrorKind::ReservedSchemaName(new_name),
1965 )));
1966 }
1967
1968 let conn_id = session
1969 .map(|session| session.conn_id())
1970 .unwrap_or(&SYSTEM_CONN_ID);
1971
1972 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1973 let cur_name = schema.name().schema.clone();
1974
1975 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1976 return Err(AdapterError::Catalog(Error::new(
1977 ErrorKind::AmbientSchemaRename(cur_name),
1978 )));
1979 };
1980 let database = state.get_database(&database_id);
1981 let database_name = &database.name;
1982
1983 let mut updates = Vec::new();
1984 let mut items_to_update = BTreeMap::new();
1985
1986 let mut update_item = |id| {
1987 if items_to_update.contains_key(id) {
1988 return Ok(());
1989 }
1990
1991 let entry = state.get_entry(id);
1992
1993 let mut new_entry = entry.clone();
1995 new_entry.item = entry
1996 .item
1997 .rename_schema_refs(database_name, &cur_name, &new_name)
1998 .map_err(|(s, _i)| {
1999 Error::new(ErrorKind::from(AmbiguousRename {
2000 depender: state
2001 .resolve_full_name(entry.name(), entry.conn_id())
2002 .to_string(),
2003 dependee: format!("{database_name}.{cur_name}"),
2004 message: format!("ambiguous reference to schema named {s}"),
2005 }))
2006 })?;
2007
2008 if !new_entry.item().is_temporary() {
2010 items_to_update.insert(*id, new_entry.into());
2011 } else {
2012 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2013 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2014 }
2015 updates.push(id);
2016
2017 Ok::<_, AdapterError>(())
2018 };
2019
2020 for (_name, item_id) in &schema.items {
2022 update_item(item_id)?;
2024
2025 for id in state.get_entry(item_id).referenced_by() {
2027 update_item(id)?;
2028 }
2029 }
2030 tx.update_items(items_to_update)?;
2033
2034 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2036 let schema_name = schema.name().schema.clone();
2037 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2038 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2039 )));
2040 };
2041
2042 let database_name = database_spec
2044 .id()
2045 .map(|id| state.get_database(&id).name.clone());
2046 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2047 id: schema_id.to_string(),
2048 old_name: schema.name().schema.clone(),
2049 new_name: new_name.clone(),
2050 database_name,
2051 });
2052 CatalogState::add_to_audit_log(
2053 &state.system_configuration,
2054 oracle_write_ts,
2055 session,
2056 tx,
2057 audit_events,
2058 EventType::Alter,
2059 mz_audit_log::ObjectType::Schema,
2060 details,
2061 )?;
2062
2063 let mut new_schema = schema.clone();
2065 new_schema.name.schema.clone_from(&new_name);
2066 tx.update_schema(schema_id, new_schema.into())?;
2067
2068 for id in updates {
2069 Self::log_update(state, id);
2070 }
2071 }
2072 Op::UpdateOwner { id, new_owner } => {
2073 let conn_id = session
2074 .map(|session| session.conn_id())
2075 .unwrap_or(&SYSTEM_CONN_ID);
2076 let old_owner = state
2077 .get_owner_id(&id, conn_id)
2078 .expect("cannot update the owner of an object without an owner");
2079 match &id {
2080 ObjectId::Cluster(id) => {
2081 let mut cluster = state.get_cluster(*id).clone();
2082 if id.is_system() {
2083 return Err(AdapterError::Catalog(Error::new(
2084 ErrorKind::ReadOnlyCluster(cluster.name),
2085 )));
2086 }
2087 Self::update_privilege_owners(
2088 &mut cluster.privileges,
2089 cluster.owner_id,
2090 new_owner,
2091 );
2092 cluster.owner_id = new_owner;
2093 tx.update_cluster(*id, cluster.into())?;
2094 }
2095 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2096 let cluster = state.get_cluster(*cluster_id);
2097 let mut replica = cluster
2098 .replica(*replica_id)
2099 .expect("catalog out of sync")
2100 .clone();
2101 if replica_id.is_system() {
2102 return Err(AdapterError::Catalog(Error::new(
2103 ErrorKind::ReadOnlyClusterReplica(replica.name),
2104 )));
2105 }
2106 replica.owner_id = new_owner;
2107 tx.update_cluster_replica(*replica_id, replica.into())?;
2108 }
2109 ObjectId::Database(id) => {
2110 let mut database = state.get_database(id).clone();
2111 if id.is_system() {
2112 return Err(AdapterError::Catalog(Error::new(
2113 ErrorKind::ReadOnlyDatabase(database.name),
2114 )));
2115 }
2116 Self::update_privilege_owners(
2117 &mut database.privileges,
2118 database.owner_id,
2119 new_owner,
2120 );
2121 database.owner_id = new_owner;
2122 tx.update_database(*id, database.clone().into())?;
2123 }
2124 ObjectId::Schema((database_spec, schema_spec)) => {
2125 let schema_id: SchemaId = schema_spec.clone().into();
2126 let mut schema = state
2127 .get_schema(database_spec, schema_spec, conn_id)
2128 .clone();
2129 if schema_id.is_system() {
2130 let name = schema.name();
2131 let full_name = state.resolve_full_schema_name(name);
2132 return Err(AdapterError::Catalog(Error::new(
2133 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2134 )));
2135 }
2136 Self::update_privilege_owners(
2137 &mut schema.privileges,
2138 schema.owner_id,
2139 new_owner,
2140 );
2141 schema.owner_id = new_owner;
2142 tx.update_schema(schema_id, schema.into())?;
2143 }
2144 ObjectId::Item(id) => {
2145 let entry = state.get_entry(id);
2146 let mut new_entry = entry.clone();
2147 if id.is_system() {
2148 let full_name = state.resolve_full_name(
2149 new_entry.name(),
2150 session.map(|session| session.conn_id()),
2151 );
2152 return Err(AdapterError::Catalog(Error::new(
2153 ErrorKind::ReadOnlyItem(full_name.to_string()),
2154 )));
2155 }
2156 Self::update_privilege_owners(
2157 &mut new_entry.privileges,
2158 new_entry.owner_id,
2159 new_owner,
2160 );
2161 new_entry.owner_id = new_owner;
2162 if !new_entry.item().is_temporary() {
2163 tx.update_item(*id, new_entry.into())?;
2164 } else {
2165 temporary_item_updates
2166 .push((entry.clone().into(), StateDiff::Retraction));
2167 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2168 }
2169 }
2170 ObjectId::NetworkPolicy(id) => {
2171 let mut policy = state.get_network_policy(id).clone();
2172 if id.is_system() {
2173 return Err(AdapterError::Catalog(Error::new(
2174 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2175 )));
2176 }
2177 Self::update_privilege_owners(
2178 &mut policy.privileges,
2179 policy.owner_id,
2180 new_owner,
2181 );
2182 policy.owner_id = new_owner;
2183 tx.update_network_policy(*id, policy.into())?;
2184 }
2185 ObjectId::Role(_) => unreachable!("roles have no owner"),
2186 }
2187 let object_type = state.get_object_type(&id);
2188 CatalogState::add_to_audit_log(
2189 &state.system_configuration,
2190 oracle_write_ts,
2191 session,
2192 tx,
2193 audit_events,
2194 EventType::Alter,
2195 object_type_to_audit_object_type(object_type),
2196 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2197 object_id: id.to_string(),
2198 old_owner_id: old_owner.to_string(),
2199 new_owner_id: new_owner.to_string(),
2200 }),
2201 )?;
2202 }
2203 Op::UpdateClusterConfig { id, name, config } => {
2204 let mut cluster = state.get_cluster(id).clone();
2205 cluster.config = config;
2206 tx.update_cluster(id, cluster.into())?;
2207 info!("update cluster {}", name);
2208
2209 CatalogState::add_to_audit_log(
2210 &state.system_configuration,
2211 oracle_write_ts,
2212 session,
2213 tx,
2214 audit_events,
2215 EventType::Alter,
2216 ObjectType::Cluster,
2217 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2218 id: id.to_string(),
2219 name,
2220 }),
2221 )?;
2222 }
2223 Op::UpdateClusterReplicaConfig {
2224 replica_id,
2225 cluster_id,
2226 config,
2227 } => {
2228 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2229 info!("update replica {}", replica.name);
2230 tx.update_cluster_replica(
2231 replica_id,
2232 mz_catalog::durable::ClusterReplica {
2233 cluster_id,
2234 replica_id,
2235 name: replica.name.clone(),
2236 config: config.clone().into(),
2237 owner_id: replica.owner_id,
2238 },
2239 )?;
2240 }
2241 Op::UpdateItem { id, name, to_item } => {
2242 let mut entry = state.get_entry(&id).clone();
2243 entry.name = name.clone();
2244 entry.item = to_item.clone();
2245 tx.update_item(id, entry.into())?;
2246
2247 if Self::should_audit_log_item(&to_item) {
2248 let mut full_name = Self::full_name_detail(
2249 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2250 );
2251 full_name.item = name.item;
2252
2253 CatalogState::add_to_audit_log(
2254 &state.system_configuration,
2255 oracle_write_ts,
2256 session,
2257 tx,
2258 audit_events,
2259 EventType::Alter,
2260 catalog_type_to_audit_object_type(to_item.typ()),
2261 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2262 id: id.to_string(),
2263 name: full_name,
2264 }),
2265 )?;
2266 }
2267
2268 Self::log_update(state, &id);
2269 }
2270 Op::UpdateSystemConfiguration { name, value } => {
2271 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2272 tx.upsert_system_config(&name, parsed_value.clone())?;
2273 if name == ENABLE_0DT_DEPLOYMENT.name() {
2277 let enable_0dt_deployment =
2278 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2279 tx.set_enable_0dt_deployment(enable_0dt_deployment)?;
2280 } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2281 let with_0dt_deployment_max_wait =
2282 Duration::parse(VarInput::Flat(&parsed_value))
2283 .expect("parsing succeeded above");
2284 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2285 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2286 let with_0dt_deployment_ddl_check_interval =
2287 Duration::parse(VarInput::Flat(&parsed_value))
2288 .expect("parsing succeeded above");
2289 tx.set_0dt_deployment_ddl_check_interval(
2290 with_0dt_deployment_ddl_check_interval,
2291 )?;
2292 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2293 let panic_after_timeout =
2294 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2295 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2296 }
2297
2298 CatalogState::add_to_audit_log(
2299 &state.system_configuration,
2300 oracle_write_ts,
2301 session,
2302 tx,
2303 audit_events,
2304 EventType::Alter,
2305 ObjectType::System,
2306 EventDetails::SetV1(mz_audit_log::SetV1 {
2307 name,
2308 value: Some(value.borrow().to_vec().join(", ")),
2309 }),
2310 )?;
2311 }
2312 Op::ResetSystemConfiguration { name } => {
2313 tx.remove_system_config(&name);
2314 if name == ENABLE_0DT_DEPLOYMENT.name() {
2318 tx.reset_enable_0dt_deployment()?;
2319 } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2320 tx.reset_0dt_deployment_max_wait()?;
2321 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2322 tx.reset_0dt_deployment_ddl_check_interval()?;
2323 }
2324
2325 CatalogState::add_to_audit_log(
2326 &state.system_configuration,
2327 oracle_write_ts,
2328 session,
2329 tx,
2330 audit_events,
2331 EventType::Alter,
2332 ObjectType::System,
2333 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2334 )?;
2335 }
2336 Op::ResetAllSystemConfiguration => {
2337 tx.clear_system_configs();
2338 tx.reset_enable_0dt_deployment()?;
2339 tx.reset_0dt_deployment_max_wait()?;
2340 tx.reset_0dt_deployment_ddl_check_interval()?;
2341
2342 CatalogState::add_to_audit_log(
2343 &state.system_configuration,
2344 oracle_write_ts,
2345 session,
2346 tx,
2347 audit_events,
2348 EventType::Alter,
2349 ObjectType::System,
2350 EventDetails::ResetAllV1,
2351 )?;
2352 }
2353 Op::WeirdStorageUsageUpdates {
2354 object_id,
2355 size_bytes,
2356 collection_timestamp,
2357 } => {
2358 let id = tx.allocate_storage_usage_ids()?;
2359 let metric =
2360 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2361 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2362 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2363 weird_builtin_table_update = Some(builtin_table_update);
2364 }
2365 };
2366 Ok((weird_builtin_table_update, temporary_item_updates))
2367 }
2368
2369 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2370 let entry = state.get_entry(id);
2371 info!(
2372 "update {} {} ({})",
2373 entry.item_type(),
2374 state.resolve_full_name(entry.name(), entry.conn_id()),
2375 id
2376 );
2377 }
2378
2379 fn update_privilege_owners(
2383 privileges: &mut PrivilegeMap,
2384 old_owner: RoleId,
2385 new_owner: RoleId,
2386 ) {
2387 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2389
2390 let mut new_present = false;
2391 for privilege in flat_privileges.iter_mut() {
2392 if privilege.grantor == old_owner {
2395 privilege.grantor = new_owner;
2396 } else if privilege.grantor == new_owner {
2397 new_present = true;
2398 }
2399 if privilege.grantee == old_owner {
2401 privilege.grantee = new_owner;
2402 } else if privilege.grantee == new_owner {
2403 new_present = true;
2404 }
2405 }
2406
2407 if new_present {
2411 let privilege_map: BTreeMap<_, Vec<_>> =
2413 flat_privileges
2414 .into_iter()
2415 .fold(BTreeMap::new(), |mut accum, privilege| {
2416 accum
2417 .entry((privilege.grantee, privilege.grantor))
2418 .or_default()
2419 .push(privilege);
2420 accum
2421 });
2422
2423 flat_privileges = privilege_map
2425 .into_iter()
2426 .map(|((grantee, grantor), values)|
2427 values.into_iter().fold(
2429 MzAclItem::empty(grantee, grantor),
2430 |mut accum, mz_aclitem| {
2431 accum.acl_mode =
2432 accum.acl_mode.union(mz_aclitem.acl_mode);
2433 accum
2434 },
2435 ))
2436 .collect();
2437 }
2438
2439 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2440 }
2441}
2442
2443#[derive(Debug, Default)]
2451pub(crate) struct ObjectsToDrop {
2452 pub comments: BTreeSet<CommentObjectId>,
2453 pub databases: BTreeSet<DatabaseId>,
2454 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2455 pub clusters: BTreeSet<ClusterId>,
2456 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2457 pub roles: BTreeSet<RoleId>,
2458 pub items: Vec<CatalogItemId>,
2459 pub network_policies: BTreeSet<NetworkPolicyId>,
2460}
2461
2462impl ObjectsToDrop {
2463 pub fn generate(
2464 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2465 state: &CatalogState,
2466 session: Option<&ConnMeta>,
2467 ) -> Result<Self, AdapterError> {
2468 let mut delta = ObjectsToDrop::default();
2469
2470 for drop_object_info in drop_object_infos {
2471 delta.add_item(drop_object_info, state, session)?;
2472 }
2473
2474 Ok(delta)
2475 }
2476
2477 fn add_item(
2478 &mut self,
2479 drop_object_info: DropObjectInfo,
2480 state: &CatalogState,
2481 session: Option<&ConnMeta>,
2482 ) -> Result<(), AdapterError> {
2483 self.comments
2484 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2485
2486 match drop_object_info {
2487 DropObjectInfo::Database(database_id) => {
2488 let database = &state.database_by_id[&database_id];
2489 if database_id.is_system() {
2490 return Err(AdapterError::Catalog(Error::new(
2491 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2492 )));
2493 }
2494
2495 self.databases.insert(database_id);
2496 }
2497 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2498 let schema = state.get_schema(
2499 &database_spec,
2500 &schema_spec,
2501 session
2502 .map(|session| session.conn_id())
2503 .unwrap_or(&SYSTEM_CONN_ID),
2504 );
2505 let schema_id: SchemaId = schema_spec.into();
2506 if schema_id.is_system() {
2507 let name = schema.name();
2508 let full_name = state.resolve_full_schema_name(name);
2509 return Err(AdapterError::Catalog(Error::new(
2510 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2511 )));
2512 }
2513
2514 self.schemas.insert(schema_spec, database_spec);
2515 }
2516 DropObjectInfo::Role(role_id) => {
2517 let name = state.get_role(&role_id).name().to_string();
2518 if role_id.is_system() || role_id.is_predefined() {
2519 return Err(AdapterError::Catalog(Error::new(
2520 ErrorKind::ReservedRoleName(name.clone()),
2521 )));
2522 }
2523 state.ensure_not_reserved_role(&role_id)?;
2524
2525 self.roles.insert(role_id);
2526 }
2527 DropObjectInfo::Cluster(cluster_id) => {
2528 let cluster = state.get_cluster(cluster_id);
2529 let name = &cluster.name;
2530 if cluster_id.is_system() {
2531 return Err(AdapterError::Catalog(Error::new(
2532 ErrorKind::ReadOnlyCluster(name.clone()),
2533 )));
2534 }
2535
2536 self.clusters.insert(cluster_id);
2537 }
2538 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2539 let cluster = state.get_cluster(cluster_id);
2540 let replica = cluster.replica(replica_id).expect("Must exist");
2541
2542 self.replicas
2543 .insert(replica.replica_id, (cluster.id, reason));
2544 }
2545 DropObjectInfo::Item(item_id) => {
2546 let entry = state.get_entry(&item_id);
2547 if item_id.is_system() {
2548 let name = entry.name();
2549 let full_name =
2550 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2551 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2552 full_name.to_string(),
2553 ))));
2554 }
2555
2556 self.items.push(item_id);
2557 }
2558 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2559 let policy = state.get_network_policy(&network_policy_id);
2560 let name = &policy.name;
2561 if network_policy_id.is_system() {
2562 return Err(AdapterError::Catalog(Error::new(
2563 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2564 )));
2565 }
2566
2567 self.network_policies.insert(network_policy_id);
2568 }
2569 }
2570
2571 Ok(())
2572 }
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2578 use mz_repr::role_id::RoleId;
2579
2580 use crate::catalog::Catalog;
2581
2582 #[mz_ore::test]
2583 fn test_update_privilege_owners() {
2584 let old_owner = RoleId::User(1);
2585 let new_owner = RoleId::User(2);
2586 let other_role = RoleId::User(3);
2587
2588 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2590 MzAclItem {
2591 grantee: other_role,
2592 grantor: old_owner,
2593 acl_mode: AclMode::UPDATE,
2594 },
2595 MzAclItem {
2596 grantee: other_role,
2597 grantor: new_owner,
2598 acl_mode: AclMode::SELECT,
2599 },
2600 ]);
2601 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2602 assert_eq!(1, privileges.all_values().count());
2603 assert_eq!(
2604 vec![MzAclItem {
2605 grantee: other_role,
2606 grantor: new_owner,
2607 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2608 }],
2609 privileges.all_values_owned().collect::<Vec<_>>()
2610 );
2611
2612 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2614 MzAclItem {
2615 grantee: old_owner,
2616 grantor: other_role,
2617 acl_mode: AclMode::UPDATE,
2618 },
2619 MzAclItem {
2620 grantee: new_owner,
2621 grantor: other_role,
2622 acl_mode: AclMode::SELECT,
2623 },
2624 ]);
2625 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2626 assert_eq!(1, privileges.all_values().count());
2627 assert_eq!(
2628 vec![MzAclItem {
2629 grantee: new_owner,
2630 grantor: other_role,
2631 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2632 }],
2633 privileges.all_values_owned().collect::<Vec<_>>()
2634 );
2635
2636 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2638 MzAclItem {
2639 grantee: old_owner,
2640 grantor: old_owner,
2641 acl_mode: AclMode::UPDATE,
2642 },
2643 MzAclItem {
2644 grantee: new_owner,
2645 grantor: new_owner,
2646 acl_mode: AclMode::SELECT,
2647 },
2648 ]);
2649 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2650 assert_eq!(1, privileges.all_values().count());
2651 assert_eq!(
2652 vec![MzAclItem {
2653 grantee: new_owner,
2654 grantor: new_owner,
2655 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2656 }],
2657 privileges.all_values_owned().collect::<Vec<_>>()
2658 );
2659 }
2660}