1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34 StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction,
50 RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69 is_reserved_role_name, object_type_to_audit_object_type,
70 system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::cluster_scheduling::SchedulingDecision;
74use crate::util::ResultExt;
75
76#[derive(Debug, Clone)]
77pub enum Op {
78 AlterRetainHistory {
79 id: CatalogItemId,
80 value: Option<Value>,
81 window: CompactionWindow,
82 },
83 AlterRole {
84 id: RoleId,
85 name: String,
86 attributes: RoleAttributesRaw,
87 nopassword: bool,
88 vars: RoleVars,
89 },
90 AlterNetworkPolicy {
91 id: NetworkPolicyId,
92 rules: Vec<NetworkPolicyRule>,
93 name: String,
94 owner_id: RoleId,
95 },
96 AlterAddColumn {
97 id: CatalogItemId,
98 new_global_id: GlobalId,
99 name: ColumnName,
100 typ: SqlColumnType,
101 sql: RawDataType,
102 },
103 CreateDatabase {
104 name: String,
105 owner_id: RoleId,
106 },
107 CreateSchema {
108 database_id: ResolvedDatabaseSpecifier,
109 schema_name: String,
110 owner_id: RoleId,
111 },
112 CreateRole {
113 name: String,
114 attributes: RoleAttributesRaw,
115 },
116 CreateCluster {
117 id: ClusterId,
118 name: String,
119 introspection_sources: Vec<&'static BuiltinLog>,
120 owner_id: RoleId,
121 config: ClusterConfig,
122 },
123 CreateClusterReplica {
124 cluster_id: ClusterId,
125 name: String,
126 config: ReplicaConfig,
127 owner_id: RoleId,
128 reason: ReplicaCreateDropReason,
129 },
130 CreateItem {
131 id: CatalogItemId,
132 name: QualifiedItemName,
133 item: CatalogItem,
134 owner_id: RoleId,
135 },
136 CreateNetworkPolicy {
137 rules: Vec<NetworkPolicyRule>,
138 name: String,
139 owner_id: RoleId,
140 },
141 Comment {
142 object_id: CommentObjectId,
143 sub_component: Option<usize>,
144 comment: Option<String>,
145 },
146 DropObjects(Vec<DropObjectInfo>),
147 GrantRole {
148 role_id: RoleId,
149 member_id: RoleId,
150 grantor_id: RoleId,
151 },
152 RenameCluster {
153 id: ClusterId,
154 name: String,
155 to_name: String,
156 check_reserved_names: bool,
157 },
158 RenameClusterReplica {
159 cluster_id: ClusterId,
160 replica_id: ReplicaId,
161 name: QualifiedReplica,
162 to_name: String,
163 },
164 RenameItem {
165 id: CatalogItemId,
166 current_full_name: FullItemName,
167 to_name: String,
168 },
169 RenameSchema {
170 database_spec: ResolvedDatabaseSpecifier,
171 schema_spec: SchemaSpecifier,
172 new_name: String,
173 check_reserved_names: bool,
174 },
175 UpdateOwner {
176 id: ObjectId,
177 new_owner: RoleId,
178 },
179 UpdatePrivilege {
180 target_id: SystemObjectId,
181 privilege: MzAclItem,
182 variant: UpdatePrivilegeVariant,
183 },
184 UpdateDefaultPrivilege {
185 privilege_object: DefaultPrivilegeObject,
186 privilege_acl_item: DefaultPrivilegeAclItem,
187 variant: UpdatePrivilegeVariant,
188 },
189 RevokeRole {
190 role_id: RoleId,
191 member_id: RoleId,
192 grantor_id: RoleId,
193 },
194 UpdateClusterConfig {
195 id: ClusterId,
196 name: String,
197 config: ClusterConfig,
198 },
199 UpdateClusterReplicaConfig {
200 cluster_id: ClusterId,
201 replica_id: ReplicaId,
202 config: ReplicaConfig,
203 },
204 UpdateItem {
205 id: CatalogItemId,
206 name: QualifiedItemName,
207 to_item: CatalogItem,
208 },
209 UpdateSourceReferences {
210 source_id: CatalogItemId,
211 references: SourceReferences,
212 },
213 UpdateSystemConfiguration {
214 name: String,
215 value: OwnedVarInput,
216 },
217 ResetSystemConfiguration {
218 name: String,
219 },
220 ResetAllSystemConfiguration,
221 WeirdStorageUsageUpdates {
228 object_id: Option<String>,
229 size_bytes: u64,
230 collection_timestamp: EpochMillis,
231 },
232 TransactionDryRun,
238}
239
240#[derive(Debug, Clone)]
244pub enum DropObjectInfo {
245 Cluster(ClusterId),
246 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
247 Database(DatabaseId),
248 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
249 Role(RoleId),
250 Item(CatalogItemId),
251 NetworkPolicy(NetworkPolicyId),
252}
253
254impl DropObjectInfo {
255 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
258 match id {
259 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
260 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
261 cluster_id,
262 replica_id,
263 ReplicaCreateDropReason::Manual,
264 )),
265 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
266 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
267 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
268 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
269 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
270 }
271 }
272
273 fn to_object_id(&self) -> ObjectId {
276 match &self {
277 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
278 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
279 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
280 }
281 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
282 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
283 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
284 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
285 DropObjectInfo::NetworkPolicy(network_policy_id) => {
286 ObjectId::NetworkPolicy(network_policy_id.clone())
287 }
288 }
289 }
290}
291
292#[derive(Debug, Clone)]
294pub enum ReplicaCreateDropReason {
295 Manual,
300 ClusterScheduling(Vec<SchedulingDecision>),
303}
304
305impl ReplicaCreateDropReason {
306 pub fn into_audit_log(
307 self,
308 ) -> (
309 CreateOrDropClusterReplicaReasonV1,
310 Option<SchedulingDecisionsWithReasonsV2>,
311 ) {
312 let (reason, scheduling_policies) = match self {
313 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
314 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
315 CreateOrDropClusterReplicaReasonV1::Schedule,
316 Some(scheduling_decisions),
317 ),
318 };
319 (
320 reason,
321 scheduling_policies
322 .as_ref()
323 .map(SchedulingDecision::reasons_to_audit_log_reasons),
324 )
325 }
326}
327
328pub struct TransactionResult {
329 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
330 pub audit_events: Vec<VersionedEvent>,
331}
332
333impl Catalog {
334 fn should_audit_log_item(item: &CatalogItem) -> bool {
335 !item.is_temporary()
336 }
337
338 fn temporary_ids(
341 &self,
342 ops: &[Op],
343 temporary_drops: BTreeSet<(&ConnectionId, String)>,
344 ) -> Result<BTreeSet<CatalogItemId>, Error> {
345 let mut creating = BTreeSet::new();
346 let mut temporary_ids = BTreeSet::new();
347 for op in ops.iter() {
348 if let Op::CreateItem {
349 id,
350 name,
351 item,
352 owner_id: _,
353 } = op
354 {
355 if let Some(conn_id) = item.conn_id() {
356 if self.item_exists_in_temp_schemas(conn_id, &name.item)
357 && !temporary_drops.contains(&(conn_id, name.item.clone()))
358 || creating.contains(&(conn_id, &name.item))
359 {
360 return Err(
361 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
362 );
363 } else {
364 creating.insert((conn_id, &name.item));
365 temporary_ids.insert(id.clone());
366 }
367 }
368 }
369 }
370 Ok(temporary_ids)
371 }
372
373 #[instrument(name = "catalog::transact")]
374 pub async fn transact(
375 &mut self,
376 storage_collections: Option<
379 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
380 >,
381 oracle_write_ts: mz_repr::Timestamp,
382 session: Option<&ConnMeta>,
383 ops: Vec<Op>,
384 ) -> Result<TransactionResult, AdapterError> {
385 trace!("transact: {:?}", ops);
386 fail::fail_point!("catalog_transact", |arg| {
387 Err(AdapterError::Unstructured(anyhow::anyhow!(
388 "failpoint: {arg:?}"
389 )))
390 });
391
392 let drop_ids: BTreeSet<CatalogItemId> = ops
393 .iter()
394 .filter_map(|op| match op {
395 Op::DropObjects(drop_object_infos) => {
396 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
397 let item_ids = ids.filter_map(|id| match id {
398 ObjectId::Item(id) => Some(id),
399 _ => None,
400 });
401 Some(item_ids)
402 }
403 _ => None,
404 })
405 .flatten()
406 .collect();
407 let temporary_drops = drop_ids
408 .iter()
409 .filter_map(|id| {
410 let entry = self.get_entry(id);
411 match entry.item().conn_id() {
412 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
413 None => None,
414 }
415 })
416 .collect();
417 let dropped_global_ids = drop_ids
418 .iter()
419 .flat_map(|item_id| self.get_global_ids(item_id))
420 .collect();
421
422 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
423 let mut builtin_table_updates = vec![];
424 let mut audit_events = vec![];
425 let mut storage = self.storage().await;
426 let mut tx = storage
427 .transaction()
428 .await
429 .unwrap_or_terminate("starting catalog transaction");
430
431 let new_state = Self::transact_inner(
432 storage_collections,
433 oracle_write_ts,
434 session,
435 ops,
436 temporary_ids,
437 &mut builtin_table_updates,
438 &mut audit_events,
439 &mut tx,
440 &self.state,
441 )
442 .await?;
443
444 tx.commit(oracle_write_ts)
449 .await
450 .unwrap_or_terminate("catalog storage transaction commit must succeed");
451
452 drop(storage);
455 if let Some(new_state) = new_state {
456 self.transient_revision += 1;
457 self.state = new_state;
458 }
459
460 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
462 if self.state.system_config().enable_mz_notices() {
463 self.state().pack_optimizer_notices(
465 &mut builtin_table_updates,
466 dropped_notices.iter(),
467 Diff::MINUS_ONE,
468 );
469 }
470
471 Ok(TransactionResult {
472 builtin_table_updates,
473 audit_events,
474 })
475 }
476
477 #[instrument(name = "catalog::transact_inner")]
485 async fn transact_inner(
486 storage_collections: Option<
487 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
488 >,
489 oracle_write_ts: mz_repr::Timestamp,
490 session: Option<&ConnMeta>,
491 mut ops: Vec<Op>,
492 temporary_ids: BTreeSet<CatalogItemId>,
493 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
494 audit_events: &mut Vec<VersionedEvent>,
495 tx: &mut Transaction<'_>,
496 state: &CatalogState,
497 ) -> Result<Option<CatalogState>, AdapterError> {
498 let mut state = Cow::Borrowed(state);
499
500 let dry_run_ops = match ops.last() {
501 Some(Op::TransactionDryRun) => {
502 ops.pop();
504 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
505 ops.clone()
506 }
507 Some(_) => vec![],
508 None => return Ok(None),
509 };
510
511 let mut storage_collections_to_create = BTreeSet::new();
512 let mut storage_collections_to_drop = BTreeSet::new();
513 let mut storage_collections_to_register = BTreeMap::new();
514
515 for op in ops {
516 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
517 oracle_write_ts,
518 session,
519 op,
520 &temporary_ids,
521 audit_events,
522 tx,
523 &*state,
524 &mut storage_collections_to_create,
525 &mut storage_collections_to_drop,
526 &mut storage_collections_to_register,
527 )
528 .await?;
529
530 if let Some(builtin_table_update) = weird_builtin_table_update {
539 builtin_table_updates.push(builtin_table_update);
540 }
541
542 let op_id = tx.op_id().into();
547 let temporary_item_updates =
548 temporary_item_updates
549 .into_iter()
550 .map(|(item, diff)| StateUpdate {
551 kind: StateUpdateKind::TemporaryItem(item),
552 ts: op_id,
553 diff,
554 });
555
556 let mut updates: Vec<_> = tx.get_and_commit_op_updates();
557 updates.extend(temporary_item_updates);
558 if !updates.is_empty() {
559 let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
560 let op_builtin_table_updates = state
561 .to_mut()
562 .resolve_builtin_table_updates(op_builtin_table_updates);
563 builtin_table_updates.extend(op_builtin_table_updates);
564 }
565 }
566
567 if dry_run_ops.is_empty() {
568 if let Some(c) = storage_collections {
570 c.prepare_state(
571 tx,
572 storage_collections_to_create,
573 storage_collections_to_drop,
574 storage_collections_to_register,
575 )
576 .await?;
577 }
578
579 let updates = tx.get_and_commit_op_updates();
580 if !updates.is_empty() {
581 let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
582 let op_builtin_table_updates = state
583 .to_mut()
584 .resolve_builtin_table_updates(op_builtin_table_updates);
585 builtin_table_updates.extend(op_builtin_table_updates);
586 }
587
588 match state {
589 Cow::Owned(state) => Ok(Some(state)),
590 Cow::Borrowed(_) => Ok(None),
591 }
592 } else {
593 Err(AdapterError::TransactionDryRun {
594 new_ops: dry_run_ops,
595 new_state: state.into_owned(),
596 })
597 }
598 }
599
600 #[instrument]
608 async fn transact_op(
609 oracle_write_ts: mz_repr::Timestamp,
610 session: Option<&ConnMeta>,
611 op: Op,
612 temporary_ids: &BTreeSet<CatalogItemId>,
613 audit_events: &mut Vec<VersionedEvent>,
614 tx: &mut Transaction<'_>,
615 state: &CatalogState,
616 storage_collections_to_create: &mut BTreeSet<GlobalId>,
617 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
618 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
619 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
620 let mut weird_builtin_table_update = None;
621 let mut temporary_item_updates = Vec::new();
622
623 match op {
624 Op::TransactionDryRun => {
625 unreachable!("TransactionDryRun can only be used a final element of ops")
626 }
627 Op::AlterRetainHistory { id, value, window } => {
628 let entry = state.get_entry(&id);
629 if id.is_system() {
630 let name = entry.name();
631 let full_name =
632 state.resolve_full_name(name, session.map(|session| session.conn_id()));
633 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
634 full_name.to_string(),
635 ))));
636 }
637
638 let mut new_entry = entry.clone();
639 let previous = new_entry
640 .item
641 .update_retain_history(value.clone(), window)
642 .map_err(|_| {
643 AdapterError::Catalog(Error::new(ErrorKind::Internal(
644 "planner should have rejected invalid alter retain history item type"
645 .to_string(),
646 )))
647 })?;
648
649 if Self::should_audit_log_item(new_entry.item()) {
650 let details =
651 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
652 id: id.to_string(),
653 old_history: previous.map(|previous| previous.to_string()),
654 new_history: value.map(|v| v.to_string()),
655 });
656 CatalogState::add_to_audit_log(
657 &state.system_configuration,
658 oracle_write_ts,
659 session,
660 tx,
661 audit_events,
662 EventType::Alter,
663 catalog_type_to_audit_object_type(new_entry.item().typ()),
664 details,
665 )?;
666 }
667
668 tx.update_item(id, new_entry.into())?;
669
670 Self::log_update(state, &id);
671 }
672 Op::AlterRole {
673 id,
674 name,
675 attributes,
676 nopassword,
677 vars,
678 } => {
679 state.ensure_not_reserved_role(&id)?;
680
681 let mut existing_role = state.get_role(&id).clone();
682 let password = attributes.password.clone();
683 existing_role.attributes = attributes.into();
684 existing_role.vars = vars;
685 let password_action = if nopassword {
686 PasswordAction::Clear
687 } else if let Some(password) = password {
688 PasswordAction::Set(password)
689 } else {
690 PasswordAction::NoChange
691 };
692 tx.update_role(id, existing_role.into(), password_action)?;
693
694 CatalogState::add_to_audit_log(
695 &state.system_configuration,
696 oracle_write_ts,
697 session,
698 tx,
699 audit_events,
700 EventType::Alter,
701 ObjectType::Role,
702 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
703 id: id.to_string(),
704 name: name.clone(),
705 }),
706 )?;
707
708 info!("update role {name} ({id})");
709 }
710 Op::AlterNetworkPolicy {
711 id,
712 rules,
713 name,
714 owner_id: _owner_id,
715 } => {
716 let existing_policy = state.get_network_policy(&id).clone();
717 let mut policy: NetworkPolicy = existing_policy.into();
718 policy.rules = rules;
719 if is_reserved_name(&name) {
720 return Err(AdapterError::Catalog(Error::new(
721 ErrorKind::ReservedNetworkPolicyName(name),
722 )));
723 }
724 tx.update_network_policy(id, policy.clone())?;
725
726 CatalogState::add_to_audit_log(
727 &state.system_configuration,
728 oracle_write_ts,
729 session,
730 tx,
731 audit_events,
732 EventType::Alter,
733 ObjectType::NetworkPolicy,
734 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
735 id: id.to_string(),
736 name: name.clone(),
737 }),
738 )?;
739
740 info!("update network policy {name} ({id})");
741 }
742 Op::AlterAddColumn {
743 id,
744 new_global_id,
745 name,
746 typ,
747 sql,
748 } => {
749 let mut new_entry = state.get_entry(&id).clone();
750 let version = new_entry.item.add_column(name, typ, sql)?;
751 let shard_id = state
754 .storage_metadata()
755 .get_collection_shard(new_entry.latest_global_id())?;
756
757 let CatalogItem::Table(table) = &mut new_entry.item else {
759 return Err(AdapterError::Unsupported("adding columns to non-Table"));
760 };
761 table.collections.insert(version, new_global_id);
762
763 tx.update_item(id, new_entry.into())?;
764 storage_collections_to_register.insert(new_global_id, shard_id);
765 }
766 Op::CreateDatabase { name, owner_id } => {
767 let database_owner_privileges = vec![rbac::owner_privilege(
768 mz_sql::catalog::ObjectType::Database,
769 owner_id,
770 )];
771 let database_default_privileges = state
772 .default_privileges
773 .get_applicable_privileges(
774 owner_id,
775 None,
776 None,
777 mz_sql::catalog::ObjectType::Database,
778 )
779 .map(|item| item.mz_acl_item(owner_id));
780 let database_privileges: Vec<_> = merge_mz_acl_items(
781 database_owner_privileges
782 .into_iter()
783 .chain(database_default_privileges),
784 )
785 .collect();
786
787 let schema_owner_privileges = vec![rbac::owner_privilege(
788 mz_sql::catalog::ObjectType::Schema,
789 owner_id,
790 )];
791 let schema_default_privileges = state
792 .default_privileges
793 .get_applicable_privileges(
794 owner_id,
795 None,
796 None,
797 mz_sql::catalog::ObjectType::Schema,
798 )
799 .map(|item| item.mz_acl_item(owner_id))
800 .chain(std::iter::once(MzAclItem {
802 grantee: RoleId::Public,
803 grantor: owner_id,
804 acl_mode: AclMode::USAGE,
805 }));
806 let schema_privileges: Vec<_> = merge_mz_acl_items(
807 schema_owner_privileges
808 .into_iter()
809 .chain(schema_default_privileges),
810 )
811 .collect();
812
813 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
814 let (database_id, _) = tx.insert_user_database(
815 &name,
816 owner_id,
817 database_privileges.clone(),
818 &temporary_oids,
819 )?;
820 let (schema_id, _) = tx.insert_user_schema(
821 database_id,
822 DEFAULT_SCHEMA,
823 owner_id,
824 schema_privileges.clone(),
825 &temporary_oids,
826 )?;
827 CatalogState::add_to_audit_log(
828 &state.system_configuration,
829 oracle_write_ts,
830 session,
831 tx,
832 audit_events,
833 EventType::Create,
834 ObjectType::Database,
835 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
836 id: database_id.to_string(),
837 name: name.clone(),
838 }),
839 )?;
840 info!("create database {}", name);
841
842 CatalogState::add_to_audit_log(
843 &state.system_configuration,
844 oracle_write_ts,
845 session,
846 tx,
847 audit_events,
848 EventType::Create,
849 ObjectType::Schema,
850 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
851 id: schema_id.to_string(),
852 name: DEFAULT_SCHEMA.to_string(),
853 database_name: Some(name),
854 }),
855 )?;
856 }
857 Op::CreateSchema {
858 database_id,
859 schema_name,
860 owner_id,
861 } => {
862 if is_reserved_name(&schema_name) {
863 return Err(AdapterError::Catalog(Error::new(
864 ErrorKind::ReservedSchemaName(schema_name),
865 )));
866 }
867 let database_id = match database_id {
868 ResolvedDatabaseSpecifier::Id(id) => id,
869 ResolvedDatabaseSpecifier::Ambient => {
870 return Err(AdapterError::Catalog(Error::new(
871 ErrorKind::ReadOnlySystemSchema(schema_name),
872 )));
873 }
874 };
875 let owner_privileges = vec![rbac::owner_privilege(
876 mz_sql::catalog::ObjectType::Schema,
877 owner_id,
878 )];
879 let default_privileges = state
880 .default_privileges
881 .get_applicable_privileges(
882 owner_id,
883 Some(database_id),
884 None,
885 mz_sql::catalog::ObjectType::Schema,
886 )
887 .map(|item| item.mz_acl_item(owner_id));
888 let privileges: Vec<_> =
889 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
890 .collect();
891 let (schema_id, _) = tx.insert_user_schema(
892 database_id,
893 &schema_name,
894 owner_id,
895 privileges.clone(),
896 &state.get_temporary_oids().collect(),
897 )?;
898 CatalogState::add_to_audit_log(
899 &state.system_configuration,
900 oracle_write_ts,
901 session,
902 tx,
903 audit_events,
904 EventType::Create,
905 ObjectType::Schema,
906 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
907 id: schema_id.to_string(),
908 name: schema_name.clone(),
909 database_name: Some(state.database_by_id[&database_id].name.clone()),
910 }),
911 )?;
912 }
913 Op::CreateRole { name, attributes } => {
914 if is_reserved_role_name(&name) {
915 return Err(AdapterError::Catalog(Error::new(
916 ErrorKind::ReservedRoleName(name),
917 )));
918 }
919 let membership = RoleMembership::new();
920 let vars = RoleVars::default();
921 let (id, _) = tx.insert_user_role(
922 name.clone(),
923 attributes.clone(),
924 membership.clone(),
925 vars.clone(),
926 &state.get_temporary_oids().collect(),
927 )?;
928 CatalogState::add_to_audit_log(
929 &state.system_configuration,
930 oracle_write_ts,
931 session,
932 tx,
933 audit_events,
934 EventType::Create,
935 ObjectType::Role,
936 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
937 id: id.to_string(),
938 name: name.clone(),
939 }),
940 )?;
941 info!("create role {}", name);
942 }
943 Op::CreateCluster {
944 id,
945 name,
946 introspection_sources,
947 owner_id,
948 config,
949 } => {
950 if is_reserved_name(&name) {
951 return Err(AdapterError::Catalog(Error::new(
952 ErrorKind::ReservedClusterName(name),
953 )));
954 }
955 let owner_privileges = vec![rbac::owner_privilege(
956 mz_sql::catalog::ObjectType::Cluster,
957 owner_id,
958 )];
959 let default_privileges = state
960 .default_privileges
961 .get_applicable_privileges(
962 owner_id,
963 None,
964 None,
965 mz_sql::catalog::ObjectType::Cluster,
966 )
967 .map(|item| item.mz_acl_item(owner_id));
968 let privileges: Vec<_> =
969 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
970 .collect();
971 let introspection_source_ids: Vec<_> = introspection_sources
972 .iter()
973 .map(|introspection_source| {
974 Transaction::allocate_introspection_source_index_id(
975 &id,
976 introspection_source.variant,
977 )
978 })
979 .collect();
980
981 let introspection_sources = introspection_sources
982 .into_iter()
983 .zip_eq(introspection_source_ids)
984 .map(|(log, (item_id, gid))| (log, item_id, gid))
985 .collect();
986
987 tx.insert_user_cluster(
988 id,
989 &name,
990 introspection_sources,
991 owner_id,
992 privileges.clone(),
993 config.clone().into(),
994 &state.get_temporary_oids().collect(),
995 )?;
996 CatalogState::add_to_audit_log(
997 &state.system_configuration,
998 oracle_write_ts,
999 session,
1000 tx,
1001 audit_events,
1002 EventType::Create,
1003 ObjectType::Cluster,
1004 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1005 id: id.to_string(),
1006 name: name.clone(),
1007 }),
1008 )?;
1009 info!("create cluster {}", name);
1010 }
1011 Op::CreateClusterReplica {
1012 cluster_id,
1013 name,
1014 config,
1015 owner_id,
1016 reason,
1017 } => {
1018 if is_reserved_name(&name) {
1019 return Err(AdapterError::Catalog(Error::new(
1020 ErrorKind::ReservedReplicaName(name),
1021 )));
1022 }
1023 let cluster = state.get_cluster(cluster_id);
1024 let id =
1025 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1026 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1027 size,
1028 billed_as,
1029 internal,
1030 ..
1031 }) = &config.location
1032 {
1033 let (reason, scheduling_policies) = reason.into_audit_log();
1034 let details = EventDetails::CreateClusterReplicaV4(
1035 mz_audit_log::CreateClusterReplicaV4 {
1036 cluster_id: cluster_id.to_string(),
1037 cluster_name: cluster.name.clone(),
1038 replica_id: Some(id.to_string()),
1039 replica_name: name.clone(),
1040 logical_size: size.clone(),
1041 billed_as: billed_as.clone(),
1042 internal: *internal,
1043 reason,
1044 scheduling_policies,
1045 },
1046 );
1047 CatalogState::add_to_audit_log(
1048 &state.system_configuration,
1049 oracle_write_ts,
1050 session,
1051 tx,
1052 audit_events,
1053 EventType::Create,
1054 ObjectType::ClusterReplica,
1055 details,
1056 )?;
1057 }
1058 }
1059 Op::CreateItem {
1060 id,
1061 name,
1062 item,
1063 owner_id,
1064 } => {
1065 state.check_unstable_dependencies(&item)?;
1066
1067 match &item {
1068 CatalogItem::Table(table) => {
1069 let gids: Vec<_> = table.global_ids().collect();
1070 assert_eq!(gids.len(), 1);
1071 storage_collections_to_create.extend(gids);
1072 }
1073 CatalogItem::Source(source) => {
1074 storage_collections_to_create.insert(source.global_id());
1075 }
1076 CatalogItem::MaterializedView(mv) => {
1077 storage_collections_to_create.insert(mv.global_id());
1078 }
1079 CatalogItem::ContinualTask(ct) => {
1080 storage_collections_to_create.insert(ct.global_id());
1081 }
1082 CatalogItem::Sink(sink) => {
1083 storage_collections_to_create.insert(sink.global_id());
1084 }
1085 CatalogItem::Log(_)
1086 | CatalogItem::View(_)
1087 | CatalogItem::Index(_)
1088 | CatalogItem::Type(_)
1089 | CatalogItem::Func(_)
1090 | CatalogItem::Secret(_)
1091 | CatalogItem::Connection(_) => (),
1092 }
1093
1094 let system_user = session.map_or(false, |s| s.user().is_system_user());
1095 if !system_user {
1096 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1097 let cluster_name = state.clusters_by_id[&id].name.clone();
1098 return Err(AdapterError::Catalog(Error::new(
1099 ErrorKind::ReadOnlyCluster(cluster_name),
1100 )));
1101 }
1102 }
1103
1104 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1105 let default_privileges = state
1106 .default_privileges
1107 .get_applicable_privileges(
1108 owner_id,
1109 name.qualifiers.database_spec.id(),
1110 Some(name.qualifiers.schema_spec.into()),
1111 item.typ().into(),
1112 )
1113 .map(|item| item.mz_acl_item(owner_id));
1114 let progress_source_privilege = if item.is_progress_source() {
1116 Some(MzAclItem {
1117 grantee: MZ_SUPPORT_ROLE_ID,
1118 grantor: owner_id,
1119 acl_mode: AclMode::SELECT,
1120 })
1121 } else {
1122 None
1123 };
1124 let privileges: Vec<_> = merge_mz_acl_items(
1125 owner_privileges
1126 .into_iter()
1127 .chain(default_privileges)
1128 .chain(progress_source_privilege),
1129 )
1130 .collect();
1131
1132 let temporary_oids = state.get_temporary_oids().collect();
1133
1134 if item.is_temporary() {
1135 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1136 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1137 {
1138 return Err(AdapterError::Catalog(Error::new(
1139 ErrorKind::InvalidTemporarySchema,
1140 )));
1141 }
1142 let oid = tx.allocate_oid(&temporary_oids)?;
1143 let item = TemporaryItem {
1144 id,
1145 oid,
1146 name: name.clone(),
1147 item: item.clone(),
1148 owner_id,
1149 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1150 };
1151 temporary_item_updates.push((item, StateDiff::Addition));
1152 } else {
1153 if let Some(temp_id) =
1154 item.uses()
1155 .iter()
1156 .find(|id| match state.try_get_entry(*id) {
1157 Some(entry) => entry.item().is_temporary(),
1158 None => temporary_ids.contains(id),
1159 })
1160 {
1161 let temp_item = state.get_entry(temp_id);
1162 return Err(AdapterError::Catalog(Error::new(
1163 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1164 )));
1165 }
1166 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1167 && !system_user
1168 {
1169 let schema_name = state
1170 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1171 .schema;
1172 return Err(AdapterError::Catalog(Error::new(
1173 ErrorKind::ReadOnlySystemSchema(schema_name),
1174 )));
1175 }
1176 let schema_id = name.qualifiers.schema_spec.clone().into();
1177 let item_type = item.typ();
1178 let (create_sql, global_id, versions) = item.to_serialized();
1179 tx.insert_user_item(
1180 id,
1181 global_id,
1182 schema_id,
1183 &name.item,
1184 create_sql,
1185 owner_id,
1186 privileges.clone(),
1187 &temporary_oids,
1188 versions,
1189 )?;
1190 info!(
1191 "create {} {} ({})",
1192 item_type,
1193 state.resolve_full_name(&name, None),
1194 id
1195 );
1196 }
1197
1198 if Self::should_audit_log_item(&item) {
1199 let name = Self::full_name_detail(
1200 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1201 );
1202 let details = match &item {
1203 CatalogItem::Source(s) => {
1204 let cluster_id = match s.data_source {
1205 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1208 match state.get_entry(&ingestion_id).cluster_id() {
1209 Some(cluster_id) => Some(cluster_id.to_string()),
1210 None => None,
1211 }
1212 }
1213 _ => match item.cluster_id() {
1214 Some(cluster_id) => Some(cluster_id.to_string()),
1215 None => None,
1216 },
1217 };
1218
1219 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1220 id: id.to_string(),
1221 cluster_id,
1222 name,
1223 external_type: s.source_type().to_string(),
1224 })
1225 }
1226 CatalogItem::Sink(s) => {
1227 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1228 id: id.to_string(),
1229 cluster_id: Some(s.cluster_id.to_string()),
1230 name,
1231 external_type: s.sink_type().to_string(),
1232 })
1233 }
1234 CatalogItem::Index(i) => {
1235 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1236 id: id.to_string(),
1237 name,
1238 cluster_id: i.cluster_id.to_string(),
1239 })
1240 }
1241 CatalogItem::MaterializedView(mv) => {
1242 EventDetails::CreateMaterializedViewV1(
1243 mz_audit_log::CreateMaterializedViewV1 {
1244 id: id.to_string(),
1245 name,
1246 cluster_id: mv.cluster_id.to_string(),
1247 },
1248 )
1249 }
1250 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1251 id: id.to_string(),
1252 name,
1253 }),
1254 };
1255 CatalogState::add_to_audit_log(
1256 &state.system_configuration,
1257 oracle_write_ts,
1258 session,
1259 tx,
1260 audit_events,
1261 EventType::Create,
1262 catalog_type_to_audit_object_type(item.typ()),
1263 details,
1264 )?;
1265 }
1266 }
1267 Op::CreateNetworkPolicy {
1268 rules,
1269 name,
1270 owner_id,
1271 } => {
1272 if state.network_policies_by_name.contains_key(&name) {
1273 return Err(AdapterError::PlanError(PlanError::Catalog(
1274 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1275 )));
1276 }
1277 if is_reserved_name(&name) {
1278 return Err(AdapterError::Catalog(Error::new(
1279 ErrorKind::ReservedNetworkPolicyName(name),
1280 )));
1281 }
1282
1283 let owner_privileges = vec![rbac::owner_privilege(
1284 mz_sql::catalog::ObjectType::NetworkPolicy,
1285 owner_id,
1286 )];
1287 let default_privileges = state
1288 .default_privileges
1289 .get_applicable_privileges(
1290 owner_id,
1291 None,
1292 None,
1293 mz_sql::catalog::ObjectType::NetworkPolicy,
1294 )
1295 .map(|item| item.mz_acl_item(owner_id));
1296 let privileges: Vec<_> =
1297 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1298 .collect();
1299
1300 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1301 let id = tx.insert_user_network_policy(
1302 name.clone(),
1303 rules,
1304 privileges,
1305 owner_id,
1306 &temporary_oids,
1307 )?;
1308
1309 CatalogState::add_to_audit_log(
1310 &state.system_configuration,
1311 oracle_write_ts,
1312 session,
1313 tx,
1314 audit_events,
1315 EventType::Create,
1316 ObjectType::NetworkPolicy,
1317 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1318 id: id.to_string(),
1319 name: name.clone(),
1320 }),
1321 )?;
1322
1323 info!("created network policy {name} ({id})");
1324 }
1325 Op::Comment {
1326 object_id,
1327 sub_component,
1328 comment,
1329 } => {
1330 tx.update_comment(object_id, sub_component, comment)?;
1331 let entry = state.get_comment_id_entry(&object_id);
1332 let should_log = entry
1333 .map(|entry| Self::should_audit_log_item(entry.item()))
1334 .unwrap_or(true);
1336 if let (Some(conn_id), true) =
1339 (session.map(|session| session.conn_id()), should_log)
1340 {
1341 CatalogState::add_to_audit_log(
1342 &state.system_configuration,
1343 oracle_write_ts,
1344 session,
1345 tx,
1346 audit_events,
1347 EventType::Comment,
1348 comment_id_to_audit_object_type(object_id),
1349 EventDetails::IdNameV1(IdNameV1 {
1350 id: format!("{object_id:?}"),
1352 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1353 }),
1354 )?;
1355 }
1356 }
1357 Op::UpdateSourceReferences {
1358 source_id,
1359 references,
1360 } => {
1361 tx.update_source_references(
1362 source_id,
1363 references
1364 .references
1365 .into_iter()
1366 .map(|reference| reference.into())
1367 .collect(),
1368 references.updated_at,
1369 )?;
1370 }
1371 Op::DropObjects(drop_object_infos) => {
1372 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1374
1375 tx.drop_comments(&delta.comments)?;
1377
1378 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1380 delta
1381 .items
1382 .iter()
1383 .map(|id| id)
1384 .partition(|id| !state.get_entry(*id).item().is_temporary());
1385 tx.remove_items(&durable_items_to_drop)?;
1386 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1387 let entry = state.get_entry(&id);
1388 (entry.clone().into(), StateDiff::Retraction)
1389 }));
1390
1391 for item_id in delta.items {
1392 let entry = state.get_entry(&item_id);
1393
1394 if entry.item().is_storage_collection() {
1395 storage_collections_to_drop.extend(entry.global_ids());
1396 }
1397
1398 if state.source_references.contains_key(&item_id) {
1399 tx.remove_source_references(item_id)?;
1400 }
1401
1402 if Self::should_audit_log_item(entry.item()) {
1403 CatalogState::add_to_audit_log(
1404 &state.system_configuration,
1405 oracle_write_ts,
1406 session,
1407 tx,
1408 audit_events,
1409 EventType::Drop,
1410 catalog_type_to_audit_object_type(entry.item().typ()),
1411 EventDetails::IdFullNameV1(IdFullNameV1 {
1412 id: item_id.to_string(),
1413 name: Self::full_name_detail(&state.resolve_full_name(
1414 entry.name(),
1415 session.map(|session| session.conn_id()),
1416 )),
1417 }),
1418 )?;
1419 }
1420 info!(
1421 "drop {} {} ({})",
1422 entry.item_type(),
1423 state.resolve_full_name(entry.name(), entry.conn_id()),
1424 item_id
1425 );
1426 }
1427
1428 let schemas = delta
1430 .schemas
1431 .iter()
1432 .map(|(schema_spec, database_spec)| {
1433 (SchemaId::from(schema_spec), *database_spec)
1434 })
1435 .collect();
1436 tx.remove_schemas(&schemas)?;
1437
1438 for (schema_spec, database_spec) in delta.schemas {
1439 let schema = state.get_schema(
1440 &database_spec,
1441 &schema_spec,
1442 session
1443 .map(|session| session.conn_id())
1444 .unwrap_or(&SYSTEM_CONN_ID),
1445 );
1446
1447 let schema_id = SchemaId::from(schema_spec);
1448 let database_id = match database_spec {
1449 ResolvedDatabaseSpecifier::Ambient => None,
1450 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1451 };
1452
1453 CatalogState::add_to_audit_log(
1454 &state.system_configuration,
1455 oracle_write_ts,
1456 session,
1457 tx,
1458 audit_events,
1459 EventType::Drop,
1460 ObjectType::Schema,
1461 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1462 id: schema_id.to_string(),
1463 name: schema.name.schema.to_string(),
1464 database_name: database_id
1465 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1466 }),
1467 )?;
1468 }
1469
1470 tx.remove_databases(&delta.databases)?;
1472
1473 for database_id in delta.databases {
1474 let database = state.get_database(&database_id).clone();
1475
1476 CatalogState::add_to_audit_log(
1477 &state.system_configuration,
1478 oracle_write_ts,
1479 session,
1480 tx,
1481 audit_events,
1482 EventType::Drop,
1483 ObjectType::Database,
1484 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1485 id: database_id.to_string(),
1486 name: database.name.clone(),
1487 }),
1488 )?;
1489 }
1490
1491 tx.remove_user_roles(&delta.roles)?;
1493
1494 for role_id in delta.roles {
1495 let role = state
1496 .roles_by_id
1497 .get(&role_id)
1498 .expect("catalog out of sync");
1499
1500 CatalogState::add_to_audit_log(
1501 &state.system_configuration,
1502 oracle_write_ts,
1503 session,
1504 tx,
1505 audit_events,
1506 EventType::Drop,
1507 ObjectType::Role,
1508 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1509 id: role.id.to_string(),
1510 name: role.name.clone(),
1511 }),
1512 )?;
1513 info!("drop role {}", role.name());
1514 }
1515
1516 tx.remove_network_policies(&delta.network_policies)?;
1518
1519 for network_policy_id in delta.network_policies {
1520 let policy = state
1521 .network_policies_by_id
1522 .get(&network_policy_id)
1523 .expect("catalog out of sync");
1524
1525 CatalogState::add_to_audit_log(
1526 &state.system_configuration,
1527 oracle_write_ts,
1528 session,
1529 tx,
1530 audit_events,
1531 EventType::Drop,
1532 ObjectType::NetworkPolicy,
1533 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1534 id: policy.id.to_string(),
1535 name: policy.name.clone(),
1536 }),
1537 )?;
1538 info!("drop network policy {}", policy.name.clone());
1539 }
1540
1541 let replicas = delta.replicas.keys().copied().collect();
1543 tx.remove_cluster_replicas(&replicas)?;
1544
1545 for (replica_id, (cluster_id, reason)) in delta.replicas {
1546 let cluster = state.get_cluster(cluster_id);
1547 let replica = cluster.replica(replica_id).expect("Must exist");
1548
1549 let (reason, scheduling_policies) = reason.into_audit_log();
1550 let details =
1551 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1552 cluster_id: cluster_id.to_string(),
1553 cluster_name: cluster.name.clone(),
1554 replica_id: Some(replica_id.to_string()),
1555 replica_name: replica.name.clone(),
1556 reason,
1557 scheduling_policies,
1558 });
1559 CatalogState::add_to_audit_log(
1560 &state.system_configuration,
1561 oracle_write_ts,
1562 session,
1563 tx,
1564 audit_events,
1565 EventType::Drop,
1566 ObjectType::ClusterReplica,
1567 details,
1568 )?;
1569 }
1570
1571 tx.remove_clusters(&delta.clusters)?;
1573
1574 for cluster_id in delta.clusters {
1575 let cluster = state.get_cluster(cluster_id);
1576
1577 CatalogState::add_to_audit_log(
1578 &state.system_configuration,
1579 oracle_write_ts,
1580 session,
1581 tx,
1582 audit_events,
1583 EventType::Drop,
1584 ObjectType::Cluster,
1585 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1586 id: cluster.id.to_string(),
1587 name: cluster.name.clone(),
1588 }),
1589 )?;
1590 }
1591 }
1592 Op::GrantRole {
1593 role_id,
1594 member_id,
1595 grantor_id,
1596 } => {
1597 state.ensure_not_reserved_role(&member_id)?;
1598 state.ensure_grantable_role(&role_id)?;
1599 if state.collect_role_membership(&role_id).contains(&member_id) {
1600 let group_role = state.get_role(&role_id);
1601 let member_role = state.get_role(&member_id);
1602 return Err(AdapterError::Catalog(Error::new(
1603 ErrorKind::CircularRoleMembership {
1604 role_name: group_role.name().to_string(),
1605 member_name: member_role.name().to_string(),
1606 },
1607 )));
1608 }
1609 let mut member_role = state.get_role(&member_id).clone();
1610 member_role.membership.map.insert(role_id, grantor_id);
1611 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1612
1613 CatalogState::add_to_audit_log(
1614 &state.system_configuration,
1615 oracle_write_ts,
1616 session,
1617 tx,
1618 audit_events,
1619 EventType::Grant,
1620 ObjectType::Role,
1621 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1622 role_id: role_id.to_string(),
1623 member_id: member_id.to_string(),
1624 grantor_id: grantor_id.to_string(),
1625 executed_by: session
1626 .map(|session| session.authenticated_role_id())
1627 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1628 .to_string(),
1629 }),
1630 )?;
1631 }
1632 Op::RevokeRole {
1633 role_id,
1634 member_id,
1635 grantor_id,
1636 } => {
1637 state.ensure_not_reserved_role(&member_id)?;
1638 state.ensure_grantable_role(&role_id)?;
1639 let mut member_role = state.get_role(&member_id).clone();
1640 member_role.membership.map.remove(&role_id);
1641 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1642
1643 CatalogState::add_to_audit_log(
1644 &state.system_configuration,
1645 oracle_write_ts,
1646 session,
1647 tx,
1648 audit_events,
1649 EventType::Revoke,
1650 ObjectType::Role,
1651 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1652 role_id: role_id.to_string(),
1653 member_id: member_id.to_string(),
1654 grantor_id: grantor_id.to_string(),
1655 executed_by: session
1656 .map(|session| session.authenticated_role_id())
1657 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1658 .to_string(),
1659 }),
1660 )?;
1661 }
1662 Op::UpdatePrivilege {
1663 target_id,
1664 privilege,
1665 variant,
1666 } => {
1667 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1668 UpdatePrivilegeVariant::Grant => {
1669 privileges.grant(privilege);
1670 }
1671 UpdatePrivilegeVariant::Revoke => {
1672 privileges.revoke(&privilege);
1673 }
1674 };
1675 match &target_id {
1676 SystemObjectId::Object(object_id) => match object_id {
1677 ObjectId::Cluster(id) => {
1678 let mut cluster = state.get_cluster(*id).clone();
1679 update_privilege_fn(&mut cluster.privileges);
1680 tx.update_cluster(*id, cluster.into())?;
1681 }
1682 ObjectId::Database(id) => {
1683 let mut database = state.get_database(id).clone();
1684 update_privilege_fn(&mut database.privileges);
1685 tx.update_database(*id, database.into())?;
1686 }
1687 ObjectId::NetworkPolicy(id) => {
1688 let mut policy = state.get_network_policy(id).clone();
1689 update_privilege_fn(&mut policy.privileges);
1690 tx.update_network_policy(*id, policy.into())?;
1691 }
1692 ObjectId::Schema((database_spec, schema_spec)) => {
1693 let schema_id = schema_spec.clone().into();
1694 let mut schema = state
1695 .get_schema(
1696 database_spec,
1697 schema_spec,
1698 session
1699 .map(|session| session.conn_id())
1700 .unwrap_or(&SYSTEM_CONN_ID),
1701 )
1702 .clone();
1703 update_privilege_fn(&mut schema.privileges);
1704 tx.update_schema(schema_id, schema.into())?;
1705 }
1706 ObjectId::Item(id) => {
1707 let entry = state.get_entry(id);
1708 let mut new_entry = entry.clone();
1709 update_privilege_fn(&mut new_entry.privileges);
1710 if !new_entry.item().is_temporary() {
1711 tx.update_item(*id, new_entry.into())?;
1712 } else {
1713 temporary_item_updates
1714 .push((entry.clone().into(), StateDiff::Retraction));
1715 temporary_item_updates
1716 .push((new_entry.into(), StateDiff::Addition));
1717 }
1718 }
1719 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1720 },
1721 SystemObjectId::System => {
1722 let mut system_privileges = state.system_privileges.clone();
1723 update_privilege_fn(&mut system_privileges);
1724 let new_privilege =
1725 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1726 tx.set_system_privilege(
1727 privilege.grantee,
1728 privilege.grantor,
1729 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1730 )?;
1731 }
1732 }
1733 let object_type = state.get_system_object_type(&target_id);
1734 let object_id_str = match &target_id {
1735 SystemObjectId::System => "SYSTEM".to_string(),
1736 SystemObjectId::Object(id) => id.to_string(),
1737 };
1738 CatalogState::add_to_audit_log(
1739 &state.system_configuration,
1740 oracle_write_ts,
1741 session,
1742 tx,
1743 audit_events,
1744 variant.into(),
1745 system_object_type_to_audit_object_type(&object_type),
1746 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1747 object_id: object_id_str,
1748 grantee_id: privilege.grantee.to_string(),
1749 grantor_id: privilege.grantor.to_string(),
1750 privileges: privilege.acl_mode.to_string(),
1751 }),
1752 )?;
1753 }
1754 Op::UpdateDefaultPrivilege {
1755 privilege_object,
1756 privilege_acl_item,
1757 variant,
1758 } => {
1759 let mut default_privileges = state.default_privileges.clone();
1760 match variant {
1761 UpdatePrivilegeVariant::Grant => default_privileges
1762 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1763 UpdatePrivilegeVariant::Revoke => {
1764 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1765 }
1766 }
1767 let new_acl_mode = default_privileges
1768 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1769 tx.set_default_privilege(
1770 privilege_object.role_id,
1771 privilege_object.database_id,
1772 privilege_object.schema_id,
1773 privilege_object.object_type,
1774 privilege_acl_item.grantee,
1775 new_acl_mode.cloned(),
1776 )?;
1777 CatalogState::add_to_audit_log(
1778 &state.system_configuration,
1779 oracle_write_ts,
1780 session,
1781 tx,
1782 audit_events,
1783 variant.into(),
1784 object_type_to_audit_object_type(privilege_object.object_type),
1785 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1786 role_id: privilege_object.role_id.to_string(),
1787 database_id: privilege_object.database_id.map(|id| id.to_string()),
1788 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1789 grantee_id: privilege_acl_item.grantee.to_string(),
1790 privileges: privilege_acl_item.acl_mode.to_string(),
1791 }),
1792 )?;
1793 }
1794 Op::RenameCluster {
1795 id,
1796 name,
1797 to_name,
1798 check_reserved_names,
1799 } => {
1800 if id.is_system() {
1801 return Err(AdapterError::Catalog(Error::new(
1802 ErrorKind::ReadOnlyCluster(name.clone()),
1803 )));
1804 }
1805 if check_reserved_names && is_reserved_name(&to_name) {
1806 return Err(AdapterError::Catalog(Error::new(
1807 ErrorKind::ReservedClusterName(to_name),
1808 )));
1809 }
1810 tx.rename_cluster(id, &name, &to_name)?;
1811 CatalogState::add_to_audit_log(
1812 &state.system_configuration,
1813 oracle_write_ts,
1814 session,
1815 tx,
1816 audit_events,
1817 EventType::Alter,
1818 ObjectType::Cluster,
1819 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1820 id: id.to_string(),
1821 old_name: name.clone(),
1822 new_name: to_name.clone(),
1823 }),
1824 )?;
1825 info!("rename cluster {name} to {to_name}");
1826 }
1827 Op::RenameClusterReplica {
1828 cluster_id,
1829 replica_id,
1830 name,
1831 to_name,
1832 } => {
1833 if is_reserved_name(&to_name) {
1834 return Err(AdapterError::Catalog(Error::new(
1835 ErrorKind::ReservedReplicaName(to_name),
1836 )));
1837 }
1838 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1839 CatalogState::add_to_audit_log(
1840 &state.system_configuration,
1841 oracle_write_ts,
1842 session,
1843 tx,
1844 audit_events,
1845 EventType::Alter,
1846 ObjectType::ClusterReplica,
1847 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1848 cluster_id: cluster_id.to_string(),
1849 replica_id: replica_id.to_string(),
1850 old_name: name.replica.as_str().to_string(),
1851 new_name: to_name.clone(),
1852 }),
1853 )?;
1854 info!("rename cluster replica {name} to {to_name}");
1855 }
1856 Op::RenameItem {
1857 id,
1858 to_name,
1859 current_full_name,
1860 } => {
1861 let mut updates = Vec::new();
1862
1863 let entry = state.get_entry(&id);
1864 if let CatalogItem::Type(_) = entry.item() {
1865 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1866 current_full_name.to_string(),
1867 ))));
1868 }
1869
1870 if entry.id().is_system() {
1871 let name = state
1872 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1873 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1874 name.to_string(),
1875 ))));
1876 }
1877
1878 let mut to_full_name = current_full_name.clone();
1879 to_full_name.item.clone_from(&to_name);
1880
1881 let mut to_qualified_name = entry.name().clone();
1882 to_qualified_name.item.clone_from(&to_name);
1883
1884 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1885 id: id.to_string(),
1886 old_name: Self::full_name_detail(¤t_full_name),
1887 new_name: Self::full_name_detail(&to_full_name),
1888 });
1889 if Self::should_audit_log_item(entry.item()) {
1890 CatalogState::add_to_audit_log(
1891 &state.system_configuration,
1892 oracle_write_ts,
1893 session,
1894 tx,
1895 audit_events,
1896 EventType::Alter,
1897 catalog_type_to_audit_object_type(entry.item().typ()),
1898 details,
1899 )?;
1900 }
1901
1902 let mut new_entry = entry.clone();
1904 new_entry.name.item.clone_from(&to_name);
1905 new_entry.item = entry
1906 .item()
1907 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1908 .map_err(|e| {
1909 Error::new(ErrorKind::from(AmbiguousRename {
1910 depender: state
1911 .resolve_full_name(entry.name(), entry.conn_id())
1912 .to_string(),
1913 dependee: state
1914 .resolve_full_name(entry.name(), entry.conn_id())
1915 .to_string(),
1916 message: e,
1917 }))
1918 })?;
1919
1920 for id in entry.referenced_by() {
1921 let dependent_item = state.get_entry(id);
1922 let mut to_entry = dependent_item.clone();
1923 to_entry.item = dependent_item
1924 .item()
1925 .rename_item_refs(
1926 current_full_name.clone(),
1927 to_full_name.item.clone(),
1928 false,
1929 )
1930 .map_err(|e| {
1931 Error::new(ErrorKind::from(AmbiguousRename {
1932 depender: state
1933 .resolve_full_name(
1934 dependent_item.name(),
1935 dependent_item.conn_id(),
1936 )
1937 .to_string(),
1938 dependee: state
1939 .resolve_full_name(entry.name(), entry.conn_id())
1940 .to_string(),
1941 message: e,
1942 }))
1943 })?;
1944
1945 if !to_entry.item().is_temporary() {
1946 tx.update_item(*id, to_entry.into())?;
1947 } else {
1948 temporary_item_updates
1949 .push((dependent_item.clone().into(), StateDiff::Retraction));
1950 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1951 }
1952 updates.push(*id);
1953 }
1954 if !new_entry.item().is_temporary() {
1955 tx.update_item(id, new_entry.into())?;
1956 } else {
1957 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1958 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1959 }
1960
1961 updates.push(id);
1962 for id in updates {
1963 Self::log_update(state, &id);
1964 }
1965 }
1966 Op::RenameSchema {
1967 database_spec,
1968 schema_spec,
1969 new_name,
1970 check_reserved_names,
1971 } => {
1972 if check_reserved_names && is_reserved_name(&new_name) {
1973 return Err(AdapterError::Catalog(Error::new(
1974 ErrorKind::ReservedSchemaName(new_name),
1975 )));
1976 }
1977
1978 let conn_id = session
1979 .map(|session| session.conn_id())
1980 .unwrap_or(&SYSTEM_CONN_ID);
1981
1982 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1983 let cur_name = schema.name().schema.clone();
1984
1985 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1986 return Err(AdapterError::Catalog(Error::new(
1987 ErrorKind::AmbientSchemaRename(cur_name),
1988 )));
1989 };
1990 let database = state.get_database(&database_id);
1991 let database_name = &database.name;
1992
1993 let mut updates = Vec::new();
1994 let mut items_to_update = BTreeMap::new();
1995
1996 let mut update_item = |id| {
1997 if items_to_update.contains_key(id) {
1998 return Ok(());
1999 }
2000
2001 let entry = state.get_entry(id);
2002
2003 let mut new_entry = entry.clone();
2005 new_entry.item = entry
2006 .item
2007 .rename_schema_refs(database_name, &cur_name, &new_name)
2008 .map_err(|(s, _i)| {
2009 Error::new(ErrorKind::from(AmbiguousRename {
2010 depender: state
2011 .resolve_full_name(entry.name(), entry.conn_id())
2012 .to_string(),
2013 dependee: format!("{database_name}.{cur_name}"),
2014 message: format!("ambiguous reference to schema named {s}"),
2015 }))
2016 })?;
2017
2018 if !new_entry.item().is_temporary() {
2020 items_to_update.insert(*id, new_entry.into());
2021 } else {
2022 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2023 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2024 }
2025 updates.push(id);
2026
2027 Ok::<_, AdapterError>(())
2028 };
2029
2030 for (_name, item_id) in &schema.items {
2032 update_item(item_id)?;
2034
2035 for id in state.get_entry(item_id).referenced_by() {
2037 update_item(id)?;
2038 }
2039 }
2040 tx.update_items(items_to_update)?;
2043
2044 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2046 let schema_name = schema.name().schema.clone();
2047 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2048 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2049 )));
2050 };
2051
2052 let database_name = database_spec
2054 .id()
2055 .map(|id| state.get_database(&id).name.clone());
2056 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2057 id: schema_id.to_string(),
2058 old_name: schema.name().schema.clone(),
2059 new_name: new_name.clone(),
2060 database_name,
2061 });
2062 CatalogState::add_to_audit_log(
2063 &state.system_configuration,
2064 oracle_write_ts,
2065 session,
2066 tx,
2067 audit_events,
2068 EventType::Alter,
2069 mz_audit_log::ObjectType::Schema,
2070 details,
2071 )?;
2072
2073 let mut new_schema = schema.clone();
2075 new_schema.name.schema.clone_from(&new_name);
2076 tx.update_schema(schema_id, new_schema.into())?;
2077
2078 for id in updates {
2079 Self::log_update(state, id);
2080 }
2081 }
2082 Op::UpdateOwner { id, new_owner } => {
2083 let conn_id = session
2084 .map(|session| session.conn_id())
2085 .unwrap_or(&SYSTEM_CONN_ID);
2086 let old_owner = state
2087 .get_owner_id(&id, conn_id)
2088 .expect("cannot update the owner of an object without an owner");
2089 match &id {
2090 ObjectId::Cluster(id) => {
2091 let mut cluster = state.get_cluster(*id).clone();
2092 if id.is_system() {
2093 return Err(AdapterError::Catalog(Error::new(
2094 ErrorKind::ReadOnlyCluster(cluster.name),
2095 )));
2096 }
2097 Self::update_privilege_owners(
2098 &mut cluster.privileges,
2099 cluster.owner_id,
2100 new_owner,
2101 );
2102 cluster.owner_id = new_owner;
2103 tx.update_cluster(*id, cluster.into())?;
2104 }
2105 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2106 let cluster = state.get_cluster(*cluster_id);
2107 let mut replica = cluster
2108 .replica(*replica_id)
2109 .expect("catalog out of sync")
2110 .clone();
2111 if replica_id.is_system() {
2112 return Err(AdapterError::Catalog(Error::new(
2113 ErrorKind::ReadOnlyClusterReplica(replica.name),
2114 )));
2115 }
2116 replica.owner_id = new_owner;
2117 tx.update_cluster_replica(*replica_id, replica.into())?;
2118 }
2119 ObjectId::Database(id) => {
2120 let mut database = state.get_database(id).clone();
2121 if id.is_system() {
2122 return Err(AdapterError::Catalog(Error::new(
2123 ErrorKind::ReadOnlyDatabase(database.name),
2124 )));
2125 }
2126 Self::update_privilege_owners(
2127 &mut database.privileges,
2128 database.owner_id,
2129 new_owner,
2130 );
2131 database.owner_id = new_owner;
2132 tx.update_database(*id, database.clone().into())?;
2133 }
2134 ObjectId::Schema((database_spec, schema_spec)) => {
2135 let schema_id: SchemaId = schema_spec.clone().into();
2136 let mut schema = state
2137 .get_schema(database_spec, schema_spec, conn_id)
2138 .clone();
2139 if schema_id.is_system() {
2140 let name = schema.name();
2141 let full_name = state.resolve_full_schema_name(name);
2142 return Err(AdapterError::Catalog(Error::new(
2143 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2144 )));
2145 }
2146 Self::update_privilege_owners(
2147 &mut schema.privileges,
2148 schema.owner_id,
2149 new_owner,
2150 );
2151 schema.owner_id = new_owner;
2152 tx.update_schema(schema_id, schema.into())?;
2153 }
2154 ObjectId::Item(id) => {
2155 let entry = state.get_entry(id);
2156 let mut new_entry = entry.clone();
2157 if id.is_system() {
2158 let full_name = state.resolve_full_name(
2159 new_entry.name(),
2160 session.map(|session| session.conn_id()),
2161 );
2162 return Err(AdapterError::Catalog(Error::new(
2163 ErrorKind::ReadOnlyItem(full_name.to_string()),
2164 )));
2165 }
2166 Self::update_privilege_owners(
2167 &mut new_entry.privileges,
2168 new_entry.owner_id,
2169 new_owner,
2170 );
2171 new_entry.owner_id = new_owner;
2172 if !new_entry.item().is_temporary() {
2173 tx.update_item(*id, new_entry.into())?;
2174 } else {
2175 temporary_item_updates
2176 .push((entry.clone().into(), StateDiff::Retraction));
2177 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2178 }
2179 }
2180 ObjectId::NetworkPolicy(id) => {
2181 let mut policy = state.get_network_policy(id).clone();
2182 if id.is_system() {
2183 return Err(AdapterError::Catalog(Error::new(
2184 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2185 )));
2186 }
2187 Self::update_privilege_owners(
2188 &mut policy.privileges,
2189 policy.owner_id,
2190 new_owner,
2191 );
2192 policy.owner_id = new_owner;
2193 tx.update_network_policy(*id, policy.into())?;
2194 }
2195 ObjectId::Role(_) => unreachable!("roles have no owner"),
2196 }
2197 let object_type = state.get_object_type(&id);
2198 CatalogState::add_to_audit_log(
2199 &state.system_configuration,
2200 oracle_write_ts,
2201 session,
2202 tx,
2203 audit_events,
2204 EventType::Alter,
2205 object_type_to_audit_object_type(object_type),
2206 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2207 object_id: id.to_string(),
2208 old_owner_id: old_owner.to_string(),
2209 new_owner_id: new_owner.to_string(),
2210 }),
2211 )?;
2212 }
2213 Op::UpdateClusterConfig { id, name, config } => {
2214 let mut cluster = state.get_cluster(id).clone();
2215 cluster.config = config;
2216 tx.update_cluster(id, cluster.into())?;
2217 info!("update cluster {}", name);
2218
2219 CatalogState::add_to_audit_log(
2220 &state.system_configuration,
2221 oracle_write_ts,
2222 session,
2223 tx,
2224 audit_events,
2225 EventType::Alter,
2226 ObjectType::Cluster,
2227 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2228 id: id.to_string(),
2229 name,
2230 }),
2231 )?;
2232 }
2233 Op::UpdateClusterReplicaConfig {
2234 replica_id,
2235 cluster_id,
2236 config,
2237 } => {
2238 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2239 info!("update replica {}", replica.name);
2240 tx.update_cluster_replica(
2241 replica_id,
2242 mz_catalog::durable::ClusterReplica {
2243 cluster_id,
2244 replica_id,
2245 name: replica.name.clone(),
2246 config: config.clone().into(),
2247 owner_id: replica.owner_id,
2248 },
2249 )?;
2250 }
2251 Op::UpdateItem { id, name, to_item } => {
2252 let mut entry = state.get_entry(&id).clone();
2253 entry.name = name.clone();
2254 entry.item = to_item.clone();
2255 tx.update_item(id, entry.into())?;
2256
2257 if Self::should_audit_log_item(&to_item) {
2258 let mut full_name = Self::full_name_detail(
2259 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2260 );
2261 full_name.item = name.item;
2262
2263 CatalogState::add_to_audit_log(
2264 &state.system_configuration,
2265 oracle_write_ts,
2266 session,
2267 tx,
2268 audit_events,
2269 EventType::Alter,
2270 catalog_type_to_audit_object_type(to_item.typ()),
2271 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2272 id: id.to_string(),
2273 name: full_name,
2274 }),
2275 )?;
2276 }
2277
2278 Self::log_update(state, &id);
2279 }
2280 Op::UpdateSystemConfiguration { name, value } => {
2281 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2282 tx.upsert_system_config(&name, parsed_value.clone())?;
2283 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2288 let with_0dt_deployment_max_wait =
2289 Duration::parse(VarInput::Flat(&parsed_value))
2290 .expect("parsing succeeded above");
2291 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2292 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2293 let with_0dt_deployment_ddl_check_interval =
2294 Duration::parse(VarInput::Flat(&parsed_value))
2295 .expect("parsing succeeded above");
2296 tx.set_0dt_deployment_ddl_check_interval(
2297 with_0dt_deployment_ddl_check_interval,
2298 )?;
2299 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2300 let panic_after_timeout =
2301 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2302 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2303 }
2304
2305 CatalogState::add_to_audit_log(
2306 &state.system_configuration,
2307 oracle_write_ts,
2308 session,
2309 tx,
2310 audit_events,
2311 EventType::Alter,
2312 ObjectType::System,
2313 EventDetails::SetV1(mz_audit_log::SetV1 {
2314 name,
2315 value: Some(value.borrow().to_vec().join(", ")),
2316 }),
2317 )?;
2318 }
2319 Op::ResetSystemConfiguration { name } => {
2320 tx.remove_system_config(&name);
2321 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2326 tx.reset_0dt_deployment_max_wait()?;
2327 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2328 tx.reset_0dt_deployment_ddl_check_interval()?;
2329 }
2330
2331 CatalogState::add_to_audit_log(
2332 &state.system_configuration,
2333 oracle_write_ts,
2334 session,
2335 tx,
2336 audit_events,
2337 EventType::Alter,
2338 ObjectType::System,
2339 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2340 )?;
2341 }
2342 Op::ResetAllSystemConfiguration => {
2343 tx.clear_system_configs();
2344 tx.reset_0dt_deployment_max_wait()?;
2345 tx.reset_0dt_deployment_ddl_check_interval()?;
2346
2347 CatalogState::add_to_audit_log(
2348 &state.system_configuration,
2349 oracle_write_ts,
2350 session,
2351 tx,
2352 audit_events,
2353 EventType::Alter,
2354 ObjectType::System,
2355 EventDetails::ResetAllV1,
2356 )?;
2357 }
2358 Op::WeirdStorageUsageUpdates {
2359 object_id,
2360 size_bytes,
2361 collection_timestamp,
2362 } => {
2363 let id = tx.allocate_storage_usage_ids()?;
2364 let metric =
2365 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2366 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2367 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2368 weird_builtin_table_update = Some(builtin_table_update);
2369 }
2370 };
2371 Ok((weird_builtin_table_update, temporary_item_updates))
2372 }
2373
2374 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2375 let entry = state.get_entry(id);
2376 info!(
2377 "update {} {} ({})",
2378 entry.item_type(),
2379 state.resolve_full_name(entry.name(), entry.conn_id()),
2380 id
2381 );
2382 }
2383
2384 fn update_privilege_owners(
2388 privileges: &mut PrivilegeMap,
2389 old_owner: RoleId,
2390 new_owner: RoleId,
2391 ) {
2392 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2394
2395 let mut new_present = false;
2396 for privilege in flat_privileges.iter_mut() {
2397 if privilege.grantor == old_owner {
2400 privilege.grantor = new_owner;
2401 } else if privilege.grantor == new_owner {
2402 new_present = true;
2403 }
2404 if privilege.grantee == old_owner {
2406 privilege.grantee = new_owner;
2407 } else if privilege.grantee == new_owner {
2408 new_present = true;
2409 }
2410 }
2411
2412 if new_present {
2416 let privilege_map: BTreeMap<_, Vec<_>> =
2418 flat_privileges
2419 .into_iter()
2420 .fold(BTreeMap::new(), |mut accum, privilege| {
2421 accum
2422 .entry((privilege.grantee, privilege.grantor))
2423 .or_default()
2424 .push(privilege);
2425 accum
2426 });
2427
2428 flat_privileges = privilege_map
2430 .into_iter()
2431 .map(|((grantee, grantor), values)|
2432 values.into_iter().fold(
2434 MzAclItem::empty(grantee, grantor),
2435 |mut accum, mz_aclitem| {
2436 accum.acl_mode =
2437 accum.acl_mode.union(mz_aclitem.acl_mode);
2438 accum
2439 },
2440 ))
2441 .collect();
2442 }
2443
2444 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2445 }
2446}
2447
2448#[derive(Debug, Default)]
2456pub(crate) struct ObjectsToDrop {
2457 pub comments: BTreeSet<CommentObjectId>,
2458 pub databases: BTreeSet<DatabaseId>,
2459 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2460 pub clusters: BTreeSet<ClusterId>,
2461 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2462 pub roles: BTreeSet<RoleId>,
2463 pub items: Vec<CatalogItemId>,
2464 pub network_policies: BTreeSet<NetworkPolicyId>,
2465}
2466
2467impl ObjectsToDrop {
2468 pub fn generate(
2469 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2470 state: &CatalogState,
2471 session: Option<&ConnMeta>,
2472 ) -> Result<Self, AdapterError> {
2473 let mut delta = ObjectsToDrop::default();
2474
2475 for drop_object_info in drop_object_infos {
2476 delta.add_item(drop_object_info, state, session)?;
2477 }
2478
2479 Ok(delta)
2480 }
2481
2482 fn add_item(
2483 &mut self,
2484 drop_object_info: DropObjectInfo,
2485 state: &CatalogState,
2486 session: Option<&ConnMeta>,
2487 ) -> Result<(), AdapterError> {
2488 self.comments
2489 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2490
2491 match drop_object_info {
2492 DropObjectInfo::Database(database_id) => {
2493 let database = &state.database_by_id[&database_id];
2494 if database_id.is_system() {
2495 return Err(AdapterError::Catalog(Error::new(
2496 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2497 )));
2498 }
2499
2500 self.databases.insert(database_id);
2501 }
2502 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2503 let schema = state.get_schema(
2504 &database_spec,
2505 &schema_spec,
2506 session
2507 .map(|session| session.conn_id())
2508 .unwrap_or(&SYSTEM_CONN_ID),
2509 );
2510 let schema_id: SchemaId = schema_spec.into();
2511 if schema_id.is_system() {
2512 let name = schema.name();
2513 let full_name = state.resolve_full_schema_name(name);
2514 return Err(AdapterError::Catalog(Error::new(
2515 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2516 )));
2517 }
2518
2519 self.schemas.insert(schema_spec, database_spec);
2520 }
2521 DropObjectInfo::Role(role_id) => {
2522 let name = state.get_role(&role_id).name().to_string();
2523 if role_id.is_system() || role_id.is_predefined() {
2524 return Err(AdapterError::Catalog(Error::new(
2525 ErrorKind::ReservedRoleName(name.clone()),
2526 )));
2527 }
2528 state.ensure_not_reserved_role(&role_id)?;
2529
2530 self.roles.insert(role_id);
2531 }
2532 DropObjectInfo::Cluster(cluster_id) => {
2533 let cluster = state.get_cluster(cluster_id);
2534 let name = &cluster.name;
2535 if cluster_id.is_system() {
2536 return Err(AdapterError::Catalog(Error::new(
2537 ErrorKind::ReadOnlyCluster(name.clone()),
2538 )));
2539 }
2540
2541 self.clusters.insert(cluster_id);
2542 }
2543 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2544 let cluster = state.get_cluster(cluster_id);
2545 let replica = cluster.replica(replica_id).expect("Must exist");
2546
2547 self.replicas
2548 .insert(replica.replica_id, (cluster.id, reason));
2549 }
2550 DropObjectInfo::Item(item_id) => {
2551 let entry = state.get_entry(&item_id);
2552 if item_id.is_system() {
2553 let name = entry.name();
2554 let full_name =
2555 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2556 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2557 full_name.to_string(),
2558 ))));
2559 }
2560
2561 self.items.push(item_id);
2562 }
2563 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2564 let policy = state.get_network_policy(&network_policy_id);
2565 let name = &policy.name;
2566 if network_policy_id.is_system() {
2567 return Err(AdapterError::Catalog(Error::new(
2568 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2569 )));
2570 }
2571
2572 self.network_policies.insert(network_policy_id);
2573 }
2574 }
2575
2576 Ok(())
2577 }
2578}
2579
2580#[cfg(test)]
2581mod tests {
2582 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2583 use mz_repr::role_id::RoleId;
2584
2585 use crate::catalog::Catalog;
2586
2587 #[mz_ore::test]
2588 fn test_update_privilege_owners() {
2589 let old_owner = RoleId::User(1);
2590 let new_owner = RoleId::User(2);
2591 let other_role = RoleId::User(3);
2592
2593 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2595 MzAclItem {
2596 grantee: other_role,
2597 grantor: old_owner,
2598 acl_mode: AclMode::UPDATE,
2599 },
2600 MzAclItem {
2601 grantee: other_role,
2602 grantor: new_owner,
2603 acl_mode: AclMode::SELECT,
2604 },
2605 ]);
2606 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2607 assert_eq!(1, privileges.all_values().count());
2608 assert_eq!(
2609 vec![MzAclItem {
2610 grantee: other_role,
2611 grantor: new_owner,
2612 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2613 }],
2614 privileges.all_values_owned().collect::<Vec<_>>()
2615 );
2616
2617 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2619 MzAclItem {
2620 grantee: old_owner,
2621 grantor: other_role,
2622 acl_mode: AclMode::UPDATE,
2623 },
2624 MzAclItem {
2625 grantee: new_owner,
2626 grantor: other_role,
2627 acl_mode: AclMode::SELECT,
2628 },
2629 ]);
2630 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2631 assert_eq!(1, privileges.all_values().count());
2632 assert_eq!(
2633 vec![MzAclItem {
2634 grantee: new_owner,
2635 grantor: other_role,
2636 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2637 }],
2638 privileges.all_values_owned().collect::<Vec<_>>()
2639 );
2640
2641 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2643 MzAclItem {
2644 grantee: old_owner,
2645 grantor: old_owner,
2646 acl_mode: AclMode::UPDATE,
2647 },
2648 MzAclItem {
2649 grantee: new_owner,
2650 grantor: new_owner,
2651 acl_mode: AclMode::SELECT,
2652 },
2653 ]);
2654 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2655 assert_eq!(1, privileges.all_values().count());
2656 assert_eq!(
2657 vec![MzAclItem {
2658 grantee: new_owner,
2659 grantor: new_owner,
2660 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2661 }],
2662 privileges.all_values_owned().collect::<Vec<_>>()
2663 );
2664 }
2665}