1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34 StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50 RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69 is_reserved_role_name, object_type_to_audit_object_type,
70 system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::cluster_scheduling::SchedulingDecision;
74use crate::util::ResultExt;
75
76#[derive(Debug, Clone)]
77pub enum Op {
78 AlterRetainHistory {
79 id: CatalogItemId,
80 value: Option<Value>,
81 window: CompactionWindow,
82 },
83 AlterRole {
84 id: RoleId,
85 name: String,
86 attributes: RoleAttributesRaw,
87 nopassword: bool,
88 vars: RoleVars,
89 },
90 AlterNetworkPolicy {
91 id: NetworkPolicyId,
92 rules: Vec<NetworkPolicyRule>,
93 name: String,
94 owner_id: RoleId,
95 },
96 AlterAddColumn {
97 id: CatalogItemId,
98 new_global_id: GlobalId,
99 name: ColumnName,
100 typ: SqlColumnType,
101 sql: RawDataType,
102 },
103 CreateDatabase {
104 name: String,
105 owner_id: RoleId,
106 },
107 CreateSchema {
108 database_id: ResolvedDatabaseSpecifier,
109 schema_name: String,
110 owner_id: RoleId,
111 },
112 CreateRole {
113 name: String,
114 attributes: RoleAttributesRaw,
115 },
116 CreateCluster {
117 id: ClusterId,
118 name: String,
119 introspection_sources: Vec<&'static BuiltinLog>,
120 owner_id: RoleId,
121 config: ClusterConfig,
122 },
123 CreateClusterReplica {
124 cluster_id: ClusterId,
125 name: String,
126 config: ReplicaConfig,
127 owner_id: RoleId,
128 reason: ReplicaCreateDropReason,
129 },
130 CreateItem {
131 id: CatalogItemId,
132 name: QualifiedItemName,
133 item: CatalogItem,
134 owner_id: RoleId,
135 },
136 CreateNetworkPolicy {
137 rules: Vec<NetworkPolicyRule>,
138 name: String,
139 owner_id: RoleId,
140 },
141 Comment {
142 object_id: CommentObjectId,
143 sub_component: Option<usize>,
144 comment: Option<String>,
145 },
146 DropObjects(Vec<DropObjectInfo>),
147 GrantRole {
148 role_id: RoleId,
149 member_id: RoleId,
150 grantor_id: RoleId,
151 },
152 RenameCluster {
153 id: ClusterId,
154 name: String,
155 to_name: String,
156 check_reserved_names: bool,
157 },
158 RenameClusterReplica {
159 cluster_id: ClusterId,
160 replica_id: ReplicaId,
161 name: QualifiedReplica,
162 to_name: String,
163 },
164 RenameItem {
165 id: CatalogItemId,
166 current_full_name: FullItemName,
167 to_name: String,
168 },
169 RenameSchema {
170 database_spec: ResolvedDatabaseSpecifier,
171 schema_spec: SchemaSpecifier,
172 new_name: String,
173 check_reserved_names: bool,
174 },
175 UpdateOwner {
176 id: ObjectId,
177 new_owner: RoleId,
178 },
179 UpdatePrivilege {
180 target_id: SystemObjectId,
181 privilege: MzAclItem,
182 variant: UpdatePrivilegeVariant,
183 },
184 UpdateDefaultPrivilege {
185 privilege_object: DefaultPrivilegeObject,
186 privilege_acl_item: DefaultPrivilegeAclItem,
187 variant: UpdatePrivilegeVariant,
188 },
189 RevokeRole {
190 role_id: RoleId,
191 member_id: RoleId,
192 grantor_id: RoleId,
193 },
194 UpdateClusterConfig {
195 id: ClusterId,
196 name: String,
197 config: ClusterConfig,
198 },
199 UpdateClusterReplicaConfig {
200 cluster_id: ClusterId,
201 replica_id: ReplicaId,
202 config: ReplicaConfig,
203 },
204 UpdateItem {
205 id: CatalogItemId,
206 name: QualifiedItemName,
207 to_item: CatalogItem,
208 },
209 UpdateSourceReferences {
210 source_id: CatalogItemId,
211 references: SourceReferences,
212 },
213 UpdateSystemConfiguration {
214 name: String,
215 value: OwnedVarInput,
216 },
217 ResetSystemConfiguration {
218 name: String,
219 },
220 ResetAllSystemConfiguration,
221 WeirdStorageUsageUpdates {
228 object_id: Option<String>,
229 size_bytes: u64,
230 collection_timestamp: EpochMillis,
231 },
232 TransactionDryRun,
238}
239
240#[derive(Debug, Clone)]
244pub enum DropObjectInfo {
245 Cluster(ClusterId),
246 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
247 Database(DatabaseId),
248 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
249 Role(RoleId),
250 Item(CatalogItemId),
251 NetworkPolicy(NetworkPolicyId),
252}
253
254impl DropObjectInfo {
255 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
258 match id {
259 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
260 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
261 cluster_id,
262 replica_id,
263 ReplicaCreateDropReason::Manual,
264 )),
265 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
266 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
267 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
268 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
269 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
270 }
271 }
272
273 fn to_object_id(&self) -> ObjectId {
276 match &self {
277 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
278 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
279 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
280 }
281 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
282 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
283 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
284 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
285 DropObjectInfo::NetworkPolicy(network_policy_id) => {
286 ObjectId::NetworkPolicy(network_policy_id.clone())
287 }
288 }
289 }
290}
291
292#[derive(Debug, Clone)]
294pub enum ReplicaCreateDropReason {
295 Manual,
300 ClusterScheduling(Vec<SchedulingDecision>),
303}
304
305impl ReplicaCreateDropReason {
306 pub fn into_audit_log(
307 self,
308 ) -> (
309 CreateOrDropClusterReplicaReasonV1,
310 Option<SchedulingDecisionsWithReasonsV2>,
311 ) {
312 let (reason, scheduling_policies) = match self {
313 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
314 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
315 CreateOrDropClusterReplicaReasonV1::Schedule,
316 Some(scheduling_decisions),
317 ),
318 };
319 (
320 reason,
321 scheduling_policies
322 .as_ref()
323 .map(SchedulingDecision::reasons_to_audit_log_reasons),
324 )
325 }
326}
327
328pub struct TransactionResult {
329 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
330 pub audit_events: Vec<VersionedEvent>,
331}
332
333impl Catalog {
334 fn should_audit_log_item(item: &CatalogItem) -> bool {
335 !item.is_temporary()
336 }
337
338 fn temporary_ids(
341 &self,
342 ops: &[Op],
343 temporary_drops: BTreeSet<(&ConnectionId, String)>,
344 ) -> Result<BTreeSet<CatalogItemId>, Error> {
345 let mut creating = BTreeSet::new();
346 let mut temporary_ids = BTreeSet::new();
347 for op in ops.iter() {
348 if let Op::CreateItem {
349 id,
350 name,
351 item,
352 owner_id: _,
353 } = op
354 {
355 if let Some(conn_id) = item.conn_id() {
356 if self.item_exists_in_temp_schemas(conn_id, &name.item)
357 && !temporary_drops.contains(&(conn_id, name.item.clone()))
358 || creating.contains(&(conn_id, &name.item))
359 {
360 return Err(
361 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
362 );
363 } else {
364 creating.insert((conn_id, &name.item));
365 temporary_ids.insert(id.clone());
366 }
367 }
368 }
369 }
370 Ok(temporary_ids)
371 }
372
373 #[instrument(name = "catalog::transact")]
374 pub async fn transact(
375 &mut self,
376 storage_collections: Option<
379 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
380 >,
381 oracle_write_ts: mz_repr::Timestamp,
382 session: Option<&ConnMeta>,
383 ops: Vec<Op>,
384 ) -> Result<TransactionResult, AdapterError> {
385 trace!("transact: {:?}", ops);
386 fail::fail_point!("catalog_transact", |arg| {
387 Err(AdapterError::Unstructured(anyhow::anyhow!(
388 "failpoint: {arg:?}"
389 )))
390 });
391
392 let drop_ids: BTreeSet<CatalogItemId> = ops
393 .iter()
394 .filter_map(|op| match op {
395 Op::DropObjects(drop_object_infos) => {
396 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
397 let item_ids = ids.filter_map(|id| match id {
398 ObjectId::Item(id) => Some(id),
399 _ => None,
400 });
401 Some(item_ids)
402 }
403 _ => None,
404 })
405 .flatten()
406 .collect();
407 let temporary_drops = drop_ids
408 .iter()
409 .filter_map(|id| {
410 let entry = self.get_entry(id);
411 match entry.item().conn_id() {
412 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
413 None => None,
414 }
415 })
416 .collect();
417 let dropped_global_ids = drop_ids
418 .iter()
419 .flat_map(|item_id| self.get_global_ids(item_id))
420 .collect();
421
422 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
423 let mut builtin_table_updates = vec![];
424 let mut audit_events = vec![];
425 let mut storage = self.storage().await;
426 let mut tx = storage
427 .transaction()
428 .await
429 .unwrap_or_terminate("starting catalog transaction");
430
431 let new_state = Self::transact_inner(
432 storage_collections,
433 oracle_write_ts,
434 session,
435 ops,
436 temporary_ids,
437 &mut builtin_table_updates,
438 &mut audit_events,
439 &mut tx,
440 &self.state,
441 )
442 .await?;
443
444 tx.commit(oracle_write_ts)
449 .await
450 .unwrap_or_terminate("catalog storage transaction commit must succeed");
451
452 drop(storage);
455 if let Some(new_state) = new_state {
456 self.transient_revision += 1;
457 self.state = new_state;
458 }
459
460 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
462 if self.state.system_config().enable_mz_notices() {
463 self.state().pack_optimizer_notices(
465 &mut builtin_table_updates,
466 dropped_notices.iter(),
467 Diff::MINUS_ONE,
468 );
469 }
470
471 Ok(TransactionResult {
472 builtin_table_updates,
473 audit_events,
474 })
475 }
476
477 #[instrument(name = "catalog::transact_inner")]
485 async fn transact_inner(
486 storage_collections: Option<
487 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
488 >,
489 oracle_write_ts: mz_repr::Timestamp,
490 session: Option<&ConnMeta>,
491 mut ops: Vec<Op>,
492 temporary_ids: BTreeSet<CatalogItemId>,
493 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
494 audit_events: &mut Vec<VersionedEvent>,
495 tx: &mut Transaction<'_>,
496 state: &CatalogState,
497 ) -> Result<Option<CatalogState>, AdapterError> {
498 let mut state = Cow::Borrowed(state);
499
500 let dry_run_ops = match ops.last() {
501 Some(Op::TransactionDryRun) => {
502 ops.pop();
504 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
505 ops.clone()
506 }
507 Some(_) => vec![],
508 None => return Ok(None),
509 };
510
511 let mut storage_collections_to_create = BTreeSet::new();
512 let mut storage_collections_to_drop = BTreeSet::new();
513 let mut storage_collections_to_register = BTreeMap::new();
514
515 for op in ops {
516 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
517 oracle_write_ts,
518 session,
519 op,
520 &temporary_ids,
521 audit_events,
522 tx,
523 &*state,
524 &mut storage_collections_to_create,
525 &mut storage_collections_to_drop,
526 &mut storage_collections_to_register,
527 )
528 .await?;
529
530 if let Some(builtin_table_update) = weird_builtin_table_update {
539 builtin_table_updates.push(builtin_table_update);
540 }
541
542 let op_id = tx.op_id().into();
547 let temporary_item_updates =
548 temporary_item_updates
549 .into_iter()
550 .map(|(item, diff)| StateUpdate {
551 kind: StateUpdateKind::TemporaryItem(item),
552 ts: op_id,
553 diff,
554 });
555
556 let mut updates: Vec<_> = tx.get_and_commit_op_updates();
557 updates.extend(temporary_item_updates);
558 if !updates.is_empty() {
559 let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
560 let op_builtin_table_updates = state
561 .to_mut()
562 .resolve_builtin_table_updates(op_builtin_table_updates);
563 builtin_table_updates.extend(op_builtin_table_updates);
564 }
565 }
566
567 if dry_run_ops.is_empty() {
568 if let Some(c) = storage_collections {
570 c.prepare_state(
571 tx,
572 storage_collections_to_create,
573 storage_collections_to_drop,
574 storage_collections_to_register,
575 )
576 .await?;
577 }
578
579 let updates = tx.get_and_commit_op_updates();
580 if !updates.is_empty() {
581 let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
582 let op_builtin_table_updates = state
583 .to_mut()
584 .resolve_builtin_table_updates(op_builtin_table_updates);
585 builtin_table_updates.extend(op_builtin_table_updates);
586 }
587
588 match state {
589 Cow::Owned(state) => Ok(Some(state)),
590 Cow::Borrowed(_) => Ok(None),
591 }
592 } else {
593 Err(AdapterError::TransactionDryRun {
594 new_ops: dry_run_ops,
595 new_state: state.into_owned(),
596 })
597 }
598 }
599
600 #[instrument]
608 async fn transact_op(
609 oracle_write_ts: mz_repr::Timestamp,
610 session: Option<&ConnMeta>,
611 op: Op,
612 temporary_ids: &BTreeSet<CatalogItemId>,
613 audit_events: &mut Vec<VersionedEvent>,
614 tx: &mut Transaction<'_>,
615 state: &CatalogState,
616 storage_collections_to_create: &mut BTreeSet<GlobalId>,
617 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
618 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
619 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
620 let mut weird_builtin_table_update = None;
621 let mut temporary_item_updates = Vec::new();
622
623 match op {
624 Op::TransactionDryRun => {
625 unreachable!("TransactionDryRun can only be used a final element of ops")
626 }
627 Op::AlterRetainHistory { id, value, window } => {
628 let entry = state.get_entry(&id);
629 if id.is_system() {
630 let name = entry.name();
631 let full_name =
632 state.resolve_full_name(name, session.map(|session| session.conn_id()));
633 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
634 full_name.to_string(),
635 ))));
636 }
637
638 let mut new_entry = entry.clone();
639 let previous = new_entry
640 .item
641 .update_retain_history(value.clone(), window)
642 .map_err(|_| {
643 AdapterError::Catalog(Error::new(ErrorKind::Internal(
644 "planner should have rejected invalid alter retain history item type"
645 .to_string(),
646 )))
647 })?;
648
649 if Self::should_audit_log_item(new_entry.item()) {
650 let details =
651 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
652 id: id.to_string(),
653 old_history: previous.map(|previous| previous.to_string()),
654 new_history: value.map(|v| v.to_string()),
655 });
656 CatalogState::add_to_audit_log(
657 &state.system_configuration,
658 oracle_write_ts,
659 session,
660 tx,
661 audit_events,
662 EventType::Alter,
663 catalog_type_to_audit_object_type(new_entry.item().typ()),
664 details,
665 )?;
666 }
667
668 tx.update_item(id, new_entry.into())?;
669
670 Self::log_update(state, &id);
671 }
672 Op::AlterRole {
673 id,
674 name,
675 attributes,
676 nopassword,
677 vars,
678 } => {
679 state.ensure_not_reserved_role(&id)?;
680
681 let mut existing_role = state.get_role(&id).clone();
682 let password = attributes.password.clone();
683 let scram_iterations = attributes
684 .scram_iterations
685 .unwrap_or_else(|| state.system_config().scram_iterations());
686 existing_role.attributes = attributes.into();
687 existing_role.vars = vars;
688 let password_action = if nopassword {
689 PasswordAction::Clear
690 } else if let Some(password) = password {
691 PasswordAction::Set(PasswordConfig {
692 password,
693 scram_iterations,
694 })
695 } else {
696 PasswordAction::NoChange
697 };
698 tx.update_role(id, existing_role.into(), password_action)?;
699
700 CatalogState::add_to_audit_log(
701 &state.system_configuration,
702 oracle_write_ts,
703 session,
704 tx,
705 audit_events,
706 EventType::Alter,
707 ObjectType::Role,
708 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
709 id: id.to_string(),
710 name: name.clone(),
711 }),
712 )?;
713
714 info!("update role {name} ({id})");
715 }
716 Op::AlterNetworkPolicy {
717 id,
718 rules,
719 name,
720 owner_id: _owner_id,
721 } => {
722 let existing_policy = state.get_network_policy(&id).clone();
723 let mut policy: NetworkPolicy = existing_policy.into();
724 policy.rules = rules;
725 if is_reserved_name(&name) {
726 return Err(AdapterError::Catalog(Error::new(
727 ErrorKind::ReservedNetworkPolicyName(name),
728 )));
729 }
730 tx.update_network_policy(id, policy.clone())?;
731
732 CatalogState::add_to_audit_log(
733 &state.system_configuration,
734 oracle_write_ts,
735 session,
736 tx,
737 audit_events,
738 EventType::Alter,
739 ObjectType::NetworkPolicy,
740 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
741 id: id.to_string(),
742 name: name.clone(),
743 }),
744 )?;
745
746 info!("update network policy {name} ({id})");
747 }
748 Op::AlterAddColumn {
749 id,
750 new_global_id,
751 name,
752 typ,
753 sql,
754 } => {
755 let mut new_entry = state.get_entry(&id).clone();
756 let version = new_entry.item.add_column(name, typ, sql)?;
757 let shard_id = state
760 .storage_metadata()
761 .get_collection_shard(new_entry.latest_global_id())?;
762
763 let CatalogItem::Table(table) = &mut new_entry.item else {
765 return Err(AdapterError::Unsupported("adding columns to non-Table"));
766 };
767 table.collections.insert(version, new_global_id);
768
769 tx.update_item(id, new_entry.into())?;
770 storage_collections_to_register.insert(new_global_id, shard_id);
771 }
772 Op::CreateDatabase { name, owner_id } => {
773 let database_owner_privileges = vec![rbac::owner_privilege(
774 mz_sql::catalog::ObjectType::Database,
775 owner_id,
776 )];
777 let database_default_privileges = state
778 .default_privileges
779 .get_applicable_privileges(
780 owner_id,
781 None,
782 None,
783 mz_sql::catalog::ObjectType::Database,
784 )
785 .map(|item| item.mz_acl_item(owner_id));
786 let database_privileges: Vec<_> = merge_mz_acl_items(
787 database_owner_privileges
788 .into_iter()
789 .chain(database_default_privileges),
790 )
791 .collect();
792
793 let schema_owner_privileges = vec![rbac::owner_privilege(
794 mz_sql::catalog::ObjectType::Schema,
795 owner_id,
796 )];
797 let schema_default_privileges = state
798 .default_privileges
799 .get_applicable_privileges(
800 owner_id,
801 None,
802 None,
803 mz_sql::catalog::ObjectType::Schema,
804 )
805 .map(|item| item.mz_acl_item(owner_id))
806 .chain(std::iter::once(MzAclItem {
808 grantee: RoleId::Public,
809 grantor: owner_id,
810 acl_mode: AclMode::USAGE,
811 }));
812 let schema_privileges: Vec<_> = merge_mz_acl_items(
813 schema_owner_privileges
814 .into_iter()
815 .chain(schema_default_privileges),
816 )
817 .collect();
818
819 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
820 let (database_id, _) = tx.insert_user_database(
821 &name,
822 owner_id,
823 database_privileges.clone(),
824 &temporary_oids,
825 )?;
826 let (schema_id, _) = tx.insert_user_schema(
827 database_id,
828 DEFAULT_SCHEMA,
829 owner_id,
830 schema_privileges.clone(),
831 &temporary_oids,
832 )?;
833 CatalogState::add_to_audit_log(
834 &state.system_configuration,
835 oracle_write_ts,
836 session,
837 tx,
838 audit_events,
839 EventType::Create,
840 ObjectType::Database,
841 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
842 id: database_id.to_string(),
843 name: name.clone(),
844 }),
845 )?;
846 info!("create database {}", name);
847
848 CatalogState::add_to_audit_log(
849 &state.system_configuration,
850 oracle_write_ts,
851 session,
852 tx,
853 audit_events,
854 EventType::Create,
855 ObjectType::Schema,
856 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
857 id: schema_id.to_string(),
858 name: DEFAULT_SCHEMA.to_string(),
859 database_name: Some(name),
860 }),
861 )?;
862 }
863 Op::CreateSchema {
864 database_id,
865 schema_name,
866 owner_id,
867 } => {
868 if is_reserved_name(&schema_name) {
869 return Err(AdapterError::Catalog(Error::new(
870 ErrorKind::ReservedSchemaName(schema_name),
871 )));
872 }
873 let database_id = match database_id {
874 ResolvedDatabaseSpecifier::Id(id) => id,
875 ResolvedDatabaseSpecifier::Ambient => {
876 return Err(AdapterError::Catalog(Error::new(
877 ErrorKind::ReadOnlySystemSchema(schema_name),
878 )));
879 }
880 };
881 let owner_privileges = vec![rbac::owner_privilege(
882 mz_sql::catalog::ObjectType::Schema,
883 owner_id,
884 )];
885 let default_privileges = state
886 .default_privileges
887 .get_applicable_privileges(
888 owner_id,
889 Some(database_id),
890 None,
891 mz_sql::catalog::ObjectType::Schema,
892 )
893 .map(|item| item.mz_acl_item(owner_id));
894 let privileges: Vec<_> =
895 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
896 .collect();
897 let (schema_id, _) = tx.insert_user_schema(
898 database_id,
899 &schema_name,
900 owner_id,
901 privileges.clone(),
902 &state.get_temporary_oids().collect(),
903 )?;
904 CatalogState::add_to_audit_log(
905 &state.system_configuration,
906 oracle_write_ts,
907 session,
908 tx,
909 audit_events,
910 EventType::Create,
911 ObjectType::Schema,
912 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
913 id: schema_id.to_string(),
914 name: schema_name.clone(),
915 database_name: Some(state.database_by_id[&database_id].name.clone()),
916 }),
917 )?;
918 }
919 Op::CreateRole { name, attributes } => {
920 if is_reserved_role_name(&name) {
921 return Err(AdapterError::Catalog(Error::new(
922 ErrorKind::ReservedRoleName(name),
923 )));
924 }
925 let membership = RoleMembership::new();
926 let vars = RoleVars::default();
927 let (id, _) = tx.insert_user_role(
928 name.clone(),
929 attributes.clone(),
930 membership.clone(),
931 vars.clone(),
932 &state.get_temporary_oids().collect(),
933 )?;
934 CatalogState::add_to_audit_log(
935 &state.system_configuration,
936 oracle_write_ts,
937 session,
938 tx,
939 audit_events,
940 EventType::Create,
941 ObjectType::Role,
942 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
943 id: id.to_string(),
944 name: name.clone(),
945 }),
946 )?;
947 info!("create role {}", name);
948 }
949 Op::CreateCluster {
950 id,
951 name,
952 introspection_sources,
953 owner_id,
954 config,
955 } => {
956 if is_reserved_name(&name) {
957 return Err(AdapterError::Catalog(Error::new(
958 ErrorKind::ReservedClusterName(name),
959 )));
960 }
961 let owner_privileges = vec![rbac::owner_privilege(
962 mz_sql::catalog::ObjectType::Cluster,
963 owner_id,
964 )];
965 let default_privileges = state
966 .default_privileges
967 .get_applicable_privileges(
968 owner_id,
969 None,
970 None,
971 mz_sql::catalog::ObjectType::Cluster,
972 )
973 .map(|item| item.mz_acl_item(owner_id));
974 let privileges: Vec<_> =
975 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
976 .collect();
977 let introspection_source_ids: Vec<_> = introspection_sources
978 .iter()
979 .map(|introspection_source| {
980 Transaction::allocate_introspection_source_index_id(
981 &id,
982 introspection_source.variant,
983 )
984 })
985 .collect();
986
987 let introspection_sources = introspection_sources
988 .into_iter()
989 .zip_eq(introspection_source_ids)
990 .map(|(log, (item_id, gid))| (log, item_id, gid))
991 .collect();
992
993 tx.insert_user_cluster(
994 id,
995 &name,
996 introspection_sources,
997 owner_id,
998 privileges.clone(),
999 config.clone().into(),
1000 &state.get_temporary_oids().collect(),
1001 )?;
1002 CatalogState::add_to_audit_log(
1003 &state.system_configuration,
1004 oracle_write_ts,
1005 session,
1006 tx,
1007 audit_events,
1008 EventType::Create,
1009 ObjectType::Cluster,
1010 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1011 id: id.to_string(),
1012 name: name.clone(),
1013 }),
1014 )?;
1015 info!("create cluster {}", name);
1016 }
1017 Op::CreateClusterReplica {
1018 cluster_id,
1019 name,
1020 config,
1021 owner_id,
1022 reason,
1023 } => {
1024 if is_reserved_name(&name) {
1025 return Err(AdapterError::Catalog(Error::new(
1026 ErrorKind::ReservedReplicaName(name),
1027 )));
1028 }
1029 let cluster = state.get_cluster(cluster_id);
1030 let id =
1031 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1032 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1033 size,
1034 billed_as,
1035 internal,
1036 ..
1037 }) = &config.location
1038 {
1039 let (reason, scheduling_policies) = reason.into_audit_log();
1040 let details = EventDetails::CreateClusterReplicaV4(
1041 mz_audit_log::CreateClusterReplicaV4 {
1042 cluster_id: cluster_id.to_string(),
1043 cluster_name: cluster.name.clone(),
1044 replica_id: Some(id.to_string()),
1045 replica_name: name.clone(),
1046 logical_size: size.clone(),
1047 billed_as: billed_as.clone(),
1048 internal: *internal,
1049 reason,
1050 scheduling_policies,
1051 },
1052 );
1053 CatalogState::add_to_audit_log(
1054 &state.system_configuration,
1055 oracle_write_ts,
1056 session,
1057 tx,
1058 audit_events,
1059 EventType::Create,
1060 ObjectType::ClusterReplica,
1061 details,
1062 )?;
1063 }
1064 }
1065 Op::CreateItem {
1066 id,
1067 name,
1068 item,
1069 owner_id,
1070 } => {
1071 state.check_unstable_dependencies(&item)?;
1072
1073 match &item {
1074 CatalogItem::Table(table) => {
1075 let gids: Vec<_> = table.global_ids().collect();
1076 assert_eq!(gids.len(), 1);
1077 storage_collections_to_create.extend(gids);
1078 }
1079 CatalogItem::Source(source) => {
1080 storage_collections_to_create.insert(source.global_id());
1081 }
1082 CatalogItem::MaterializedView(mv) => {
1083 storage_collections_to_create.insert(mv.global_id_writes());
1084 }
1085 CatalogItem::ContinualTask(ct) => {
1086 storage_collections_to_create.insert(ct.global_id());
1087 }
1088 CatalogItem::Sink(sink) => {
1089 storage_collections_to_create.insert(sink.global_id());
1090 }
1091 CatalogItem::Log(_)
1092 | CatalogItem::View(_)
1093 | CatalogItem::Index(_)
1094 | CatalogItem::Type(_)
1095 | CatalogItem::Func(_)
1096 | CatalogItem::Secret(_)
1097 | CatalogItem::Connection(_) => (),
1098 }
1099
1100 let system_user = session.map_or(false, |s| s.user().is_system_user());
1101 if !system_user {
1102 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1103 let cluster_name = state.clusters_by_id[&id].name.clone();
1104 return Err(AdapterError::Catalog(Error::new(
1105 ErrorKind::ReadOnlyCluster(cluster_name),
1106 )));
1107 }
1108 }
1109
1110 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1111 let default_privileges = state
1112 .default_privileges
1113 .get_applicable_privileges(
1114 owner_id,
1115 name.qualifiers.database_spec.id(),
1116 Some(name.qualifiers.schema_spec.into()),
1117 item.typ().into(),
1118 )
1119 .map(|item| item.mz_acl_item(owner_id));
1120 let progress_source_privilege = if item.is_progress_source() {
1122 Some(MzAclItem {
1123 grantee: MZ_SUPPORT_ROLE_ID,
1124 grantor: owner_id,
1125 acl_mode: AclMode::SELECT,
1126 })
1127 } else {
1128 None
1129 };
1130 let privileges: Vec<_> = merge_mz_acl_items(
1131 owner_privileges
1132 .into_iter()
1133 .chain(default_privileges)
1134 .chain(progress_source_privilege),
1135 )
1136 .collect();
1137
1138 let temporary_oids = state.get_temporary_oids().collect();
1139
1140 if item.is_temporary() {
1141 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1142 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1143 {
1144 return Err(AdapterError::Catalog(Error::new(
1145 ErrorKind::InvalidTemporarySchema,
1146 )));
1147 }
1148 let oid = tx.allocate_oid(&temporary_oids)?;
1149 let item = TemporaryItem {
1150 id,
1151 oid,
1152 name: name.clone(),
1153 item: item.clone(),
1154 owner_id,
1155 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1156 };
1157 temporary_item_updates.push((item, StateDiff::Addition));
1158 } else {
1159 if let Some(temp_id) =
1160 item.uses()
1161 .iter()
1162 .find(|id| match state.try_get_entry(*id) {
1163 Some(entry) => entry.item().is_temporary(),
1164 None => temporary_ids.contains(id),
1165 })
1166 {
1167 let temp_item = state.get_entry(temp_id);
1168 return Err(AdapterError::Catalog(Error::new(
1169 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1170 )));
1171 }
1172 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1173 && !system_user
1174 {
1175 let schema_name = state
1176 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1177 .schema;
1178 return Err(AdapterError::Catalog(Error::new(
1179 ErrorKind::ReadOnlySystemSchema(schema_name),
1180 )));
1181 }
1182 let schema_id = name.qualifiers.schema_spec.clone().into();
1183 let item_type = item.typ();
1184 let (create_sql, global_id, versions) = item.to_serialized();
1185 tx.insert_user_item(
1186 id,
1187 global_id,
1188 schema_id,
1189 &name.item,
1190 create_sql,
1191 owner_id,
1192 privileges.clone(),
1193 &temporary_oids,
1194 versions,
1195 )?;
1196 info!(
1197 "create {} {} ({})",
1198 item_type,
1199 state.resolve_full_name(&name, None),
1200 id
1201 );
1202 }
1203
1204 if Self::should_audit_log_item(&item) {
1205 let name = Self::full_name_detail(
1206 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1207 );
1208 let details = match &item {
1209 CatalogItem::Source(s) => {
1210 let cluster_id = match s.data_source {
1211 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1214 match state.get_entry(&ingestion_id).cluster_id() {
1215 Some(cluster_id) => Some(cluster_id.to_string()),
1216 None => None,
1217 }
1218 }
1219 _ => match item.cluster_id() {
1220 Some(cluster_id) => Some(cluster_id.to_string()),
1221 None => None,
1222 },
1223 };
1224
1225 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1226 id: id.to_string(),
1227 cluster_id,
1228 name,
1229 external_type: s.source_type().to_string(),
1230 })
1231 }
1232 CatalogItem::Sink(s) => {
1233 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1234 id: id.to_string(),
1235 cluster_id: Some(s.cluster_id.to_string()),
1236 name,
1237 external_type: s.sink_type().to_string(),
1238 })
1239 }
1240 CatalogItem::Index(i) => {
1241 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1242 id: id.to_string(),
1243 name,
1244 cluster_id: i.cluster_id.to_string(),
1245 })
1246 }
1247 CatalogItem::MaterializedView(mv) => {
1248 EventDetails::CreateMaterializedViewV1(
1249 mz_audit_log::CreateMaterializedViewV1 {
1250 id: id.to_string(),
1251 name,
1252 cluster_id: mv.cluster_id.to_string(),
1253 },
1254 )
1255 }
1256 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1257 id: id.to_string(),
1258 name,
1259 }),
1260 };
1261 CatalogState::add_to_audit_log(
1262 &state.system_configuration,
1263 oracle_write_ts,
1264 session,
1265 tx,
1266 audit_events,
1267 EventType::Create,
1268 catalog_type_to_audit_object_type(item.typ()),
1269 details,
1270 )?;
1271 }
1272 }
1273 Op::CreateNetworkPolicy {
1274 rules,
1275 name,
1276 owner_id,
1277 } => {
1278 if state.network_policies_by_name.contains_key(&name) {
1279 return Err(AdapterError::PlanError(PlanError::Catalog(
1280 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1281 )));
1282 }
1283 if is_reserved_name(&name) {
1284 return Err(AdapterError::Catalog(Error::new(
1285 ErrorKind::ReservedNetworkPolicyName(name),
1286 )));
1287 }
1288
1289 let owner_privileges = vec![rbac::owner_privilege(
1290 mz_sql::catalog::ObjectType::NetworkPolicy,
1291 owner_id,
1292 )];
1293 let default_privileges = state
1294 .default_privileges
1295 .get_applicable_privileges(
1296 owner_id,
1297 None,
1298 None,
1299 mz_sql::catalog::ObjectType::NetworkPolicy,
1300 )
1301 .map(|item| item.mz_acl_item(owner_id));
1302 let privileges: Vec<_> =
1303 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1304 .collect();
1305
1306 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1307 let id = tx.insert_user_network_policy(
1308 name.clone(),
1309 rules,
1310 privileges,
1311 owner_id,
1312 &temporary_oids,
1313 )?;
1314
1315 CatalogState::add_to_audit_log(
1316 &state.system_configuration,
1317 oracle_write_ts,
1318 session,
1319 tx,
1320 audit_events,
1321 EventType::Create,
1322 ObjectType::NetworkPolicy,
1323 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1324 id: id.to_string(),
1325 name: name.clone(),
1326 }),
1327 )?;
1328
1329 info!("created network policy {name} ({id})");
1330 }
1331 Op::Comment {
1332 object_id,
1333 sub_component,
1334 comment,
1335 } => {
1336 tx.update_comment(object_id, sub_component, comment)?;
1337 let entry = state.get_comment_id_entry(&object_id);
1338 let should_log = entry
1339 .map(|entry| Self::should_audit_log_item(entry.item()))
1340 .unwrap_or(true);
1342 if let (Some(conn_id), true) =
1345 (session.map(|session| session.conn_id()), should_log)
1346 {
1347 CatalogState::add_to_audit_log(
1348 &state.system_configuration,
1349 oracle_write_ts,
1350 session,
1351 tx,
1352 audit_events,
1353 EventType::Comment,
1354 comment_id_to_audit_object_type(object_id),
1355 EventDetails::IdNameV1(IdNameV1 {
1356 id: format!("{object_id:?}"),
1358 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1359 }),
1360 )?;
1361 }
1362 }
1363 Op::UpdateSourceReferences {
1364 source_id,
1365 references,
1366 } => {
1367 tx.update_source_references(
1368 source_id,
1369 references
1370 .references
1371 .into_iter()
1372 .map(|reference| reference.into())
1373 .collect(),
1374 references.updated_at,
1375 )?;
1376 }
1377 Op::DropObjects(drop_object_infos) => {
1378 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1380
1381 tx.drop_comments(&delta.comments)?;
1383
1384 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1386 delta
1387 .items
1388 .iter()
1389 .map(|id| id)
1390 .partition(|id| !state.get_entry(*id).item().is_temporary());
1391 tx.remove_items(&durable_items_to_drop)?;
1392 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1393 let entry = state.get_entry(&id);
1394 (entry.clone().into(), StateDiff::Retraction)
1395 }));
1396
1397 for item_id in delta.items {
1398 let entry = state.get_entry(&item_id);
1399
1400 if entry.item().is_storage_collection() {
1401 storage_collections_to_drop.extend(entry.global_ids());
1402 }
1403
1404 if state.source_references.contains_key(&item_id) {
1405 tx.remove_source_references(item_id)?;
1406 }
1407
1408 if Self::should_audit_log_item(entry.item()) {
1409 CatalogState::add_to_audit_log(
1410 &state.system_configuration,
1411 oracle_write_ts,
1412 session,
1413 tx,
1414 audit_events,
1415 EventType::Drop,
1416 catalog_type_to_audit_object_type(entry.item().typ()),
1417 EventDetails::IdFullNameV1(IdFullNameV1 {
1418 id: item_id.to_string(),
1419 name: Self::full_name_detail(&state.resolve_full_name(
1420 entry.name(),
1421 session.map(|session| session.conn_id()),
1422 )),
1423 }),
1424 )?;
1425 }
1426 info!(
1427 "drop {} {} ({})",
1428 entry.item_type(),
1429 state.resolve_full_name(entry.name(), entry.conn_id()),
1430 item_id
1431 );
1432 }
1433
1434 let schemas = delta
1436 .schemas
1437 .iter()
1438 .map(|(schema_spec, database_spec)| {
1439 (SchemaId::from(schema_spec), *database_spec)
1440 })
1441 .collect();
1442 tx.remove_schemas(&schemas)?;
1443
1444 for (schema_spec, database_spec) in delta.schemas {
1445 let schema = state.get_schema(
1446 &database_spec,
1447 &schema_spec,
1448 session
1449 .map(|session| session.conn_id())
1450 .unwrap_or(&SYSTEM_CONN_ID),
1451 );
1452
1453 let schema_id = SchemaId::from(schema_spec);
1454 let database_id = match database_spec {
1455 ResolvedDatabaseSpecifier::Ambient => None,
1456 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1457 };
1458
1459 CatalogState::add_to_audit_log(
1460 &state.system_configuration,
1461 oracle_write_ts,
1462 session,
1463 tx,
1464 audit_events,
1465 EventType::Drop,
1466 ObjectType::Schema,
1467 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1468 id: schema_id.to_string(),
1469 name: schema.name.schema.to_string(),
1470 database_name: database_id
1471 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1472 }),
1473 )?;
1474 }
1475
1476 tx.remove_databases(&delta.databases)?;
1478
1479 for database_id in delta.databases {
1480 let database = state.get_database(&database_id).clone();
1481
1482 CatalogState::add_to_audit_log(
1483 &state.system_configuration,
1484 oracle_write_ts,
1485 session,
1486 tx,
1487 audit_events,
1488 EventType::Drop,
1489 ObjectType::Database,
1490 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1491 id: database_id.to_string(),
1492 name: database.name.clone(),
1493 }),
1494 )?;
1495 }
1496
1497 tx.remove_user_roles(&delta.roles)?;
1499
1500 for role_id in delta.roles {
1501 let role = state
1502 .roles_by_id
1503 .get(&role_id)
1504 .expect("catalog out of sync");
1505
1506 CatalogState::add_to_audit_log(
1507 &state.system_configuration,
1508 oracle_write_ts,
1509 session,
1510 tx,
1511 audit_events,
1512 EventType::Drop,
1513 ObjectType::Role,
1514 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1515 id: role.id.to_string(),
1516 name: role.name.clone(),
1517 }),
1518 )?;
1519 info!("drop role {}", role.name());
1520 }
1521
1522 tx.remove_network_policies(&delta.network_policies)?;
1524
1525 for network_policy_id in delta.network_policies {
1526 let policy = state
1527 .network_policies_by_id
1528 .get(&network_policy_id)
1529 .expect("catalog out of sync");
1530
1531 CatalogState::add_to_audit_log(
1532 &state.system_configuration,
1533 oracle_write_ts,
1534 session,
1535 tx,
1536 audit_events,
1537 EventType::Drop,
1538 ObjectType::NetworkPolicy,
1539 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1540 id: policy.id.to_string(),
1541 name: policy.name.clone(),
1542 }),
1543 )?;
1544 info!("drop network policy {}", policy.name.clone());
1545 }
1546
1547 let replicas = delta.replicas.keys().copied().collect();
1549 tx.remove_cluster_replicas(&replicas)?;
1550
1551 for (replica_id, (cluster_id, reason)) in delta.replicas {
1552 let cluster = state.get_cluster(cluster_id);
1553 let replica = cluster.replica(replica_id).expect("Must exist");
1554
1555 let (reason, scheduling_policies) = reason.into_audit_log();
1556 let details =
1557 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1558 cluster_id: cluster_id.to_string(),
1559 cluster_name: cluster.name.clone(),
1560 replica_id: Some(replica_id.to_string()),
1561 replica_name: replica.name.clone(),
1562 reason,
1563 scheduling_policies,
1564 });
1565 CatalogState::add_to_audit_log(
1566 &state.system_configuration,
1567 oracle_write_ts,
1568 session,
1569 tx,
1570 audit_events,
1571 EventType::Drop,
1572 ObjectType::ClusterReplica,
1573 details,
1574 )?;
1575 }
1576
1577 tx.remove_clusters(&delta.clusters)?;
1579
1580 for cluster_id in delta.clusters {
1581 let cluster = state.get_cluster(cluster_id);
1582
1583 CatalogState::add_to_audit_log(
1584 &state.system_configuration,
1585 oracle_write_ts,
1586 session,
1587 tx,
1588 audit_events,
1589 EventType::Drop,
1590 ObjectType::Cluster,
1591 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1592 id: cluster.id.to_string(),
1593 name: cluster.name.clone(),
1594 }),
1595 )?;
1596 }
1597 }
1598 Op::GrantRole {
1599 role_id,
1600 member_id,
1601 grantor_id,
1602 } => {
1603 state.ensure_not_reserved_role(&member_id)?;
1604 state.ensure_grantable_role(&role_id)?;
1605 if state.collect_role_membership(&role_id).contains(&member_id) {
1606 let group_role = state.get_role(&role_id);
1607 let member_role = state.get_role(&member_id);
1608 return Err(AdapterError::Catalog(Error::new(
1609 ErrorKind::CircularRoleMembership {
1610 role_name: group_role.name().to_string(),
1611 member_name: member_role.name().to_string(),
1612 },
1613 )));
1614 }
1615 let mut member_role = state.get_role(&member_id).clone();
1616 member_role.membership.map.insert(role_id, grantor_id);
1617 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1618
1619 CatalogState::add_to_audit_log(
1620 &state.system_configuration,
1621 oracle_write_ts,
1622 session,
1623 tx,
1624 audit_events,
1625 EventType::Grant,
1626 ObjectType::Role,
1627 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1628 role_id: role_id.to_string(),
1629 member_id: member_id.to_string(),
1630 grantor_id: grantor_id.to_string(),
1631 executed_by: session
1632 .map(|session| session.authenticated_role_id())
1633 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1634 .to_string(),
1635 }),
1636 )?;
1637 }
1638 Op::RevokeRole {
1639 role_id,
1640 member_id,
1641 grantor_id,
1642 } => {
1643 state.ensure_not_reserved_role(&member_id)?;
1644 state.ensure_grantable_role(&role_id)?;
1645 let mut member_role = state.get_role(&member_id).clone();
1646 member_role.membership.map.remove(&role_id);
1647 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1648
1649 CatalogState::add_to_audit_log(
1650 &state.system_configuration,
1651 oracle_write_ts,
1652 session,
1653 tx,
1654 audit_events,
1655 EventType::Revoke,
1656 ObjectType::Role,
1657 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1658 role_id: role_id.to_string(),
1659 member_id: member_id.to_string(),
1660 grantor_id: grantor_id.to_string(),
1661 executed_by: session
1662 .map(|session| session.authenticated_role_id())
1663 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1664 .to_string(),
1665 }),
1666 )?;
1667 }
1668 Op::UpdatePrivilege {
1669 target_id,
1670 privilege,
1671 variant,
1672 } => {
1673 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1674 UpdatePrivilegeVariant::Grant => {
1675 privileges.grant(privilege);
1676 }
1677 UpdatePrivilegeVariant::Revoke => {
1678 privileges.revoke(&privilege);
1679 }
1680 };
1681 match &target_id {
1682 SystemObjectId::Object(object_id) => match object_id {
1683 ObjectId::Cluster(id) => {
1684 let mut cluster = state.get_cluster(*id).clone();
1685 update_privilege_fn(&mut cluster.privileges);
1686 tx.update_cluster(*id, cluster.into())?;
1687 }
1688 ObjectId::Database(id) => {
1689 let mut database = state.get_database(id).clone();
1690 update_privilege_fn(&mut database.privileges);
1691 tx.update_database(*id, database.into())?;
1692 }
1693 ObjectId::NetworkPolicy(id) => {
1694 let mut policy = state.get_network_policy(id).clone();
1695 update_privilege_fn(&mut policy.privileges);
1696 tx.update_network_policy(*id, policy.into())?;
1697 }
1698 ObjectId::Schema((database_spec, schema_spec)) => {
1699 let schema_id = schema_spec.clone().into();
1700 let mut schema = state
1701 .get_schema(
1702 database_spec,
1703 schema_spec,
1704 session
1705 .map(|session| session.conn_id())
1706 .unwrap_or(&SYSTEM_CONN_ID),
1707 )
1708 .clone();
1709 update_privilege_fn(&mut schema.privileges);
1710 tx.update_schema(schema_id, schema.into())?;
1711 }
1712 ObjectId::Item(id) => {
1713 let entry = state.get_entry(id);
1714 let mut new_entry = entry.clone();
1715 update_privilege_fn(&mut new_entry.privileges);
1716 if !new_entry.item().is_temporary() {
1717 tx.update_item(*id, new_entry.into())?;
1718 } else {
1719 temporary_item_updates
1720 .push((entry.clone().into(), StateDiff::Retraction));
1721 temporary_item_updates
1722 .push((new_entry.into(), StateDiff::Addition));
1723 }
1724 }
1725 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1726 },
1727 SystemObjectId::System => {
1728 let mut system_privileges = state.system_privileges.clone();
1729 update_privilege_fn(&mut system_privileges);
1730 let new_privilege =
1731 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1732 tx.set_system_privilege(
1733 privilege.grantee,
1734 privilege.grantor,
1735 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1736 )?;
1737 }
1738 }
1739 let object_type = state.get_system_object_type(&target_id);
1740 let object_id_str = match &target_id {
1741 SystemObjectId::System => "SYSTEM".to_string(),
1742 SystemObjectId::Object(id) => id.to_string(),
1743 };
1744 CatalogState::add_to_audit_log(
1745 &state.system_configuration,
1746 oracle_write_ts,
1747 session,
1748 tx,
1749 audit_events,
1750 variant.into(),
1751 system_object_type_to_audit_object_type(&object_type),
1752 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1753 object_id: object_id_str,
1754 grantee_id: privilege.grantee.to_string(),
1755 grantor_id: privilege.grantor.to_string(),
1756 privileges: privilege.acl_mode.to_string(),
1757 }),
1758 )?;
1759 }
1760 Op::UpdateDefaultPrivilege {
1761 privilege_object,
1762 privilege_acl_item,
1763 variant,
1764 } => {
1765 let mut default_privileges = state.default_privileges.clone();
1766 match variant {
1767 UpdatePrivilegeVariant::Grant => default_privileges
1768 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1769 UpdatePrivilegeVariant::Revoke => {
1770 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1771 }
1772 }
1773 let new_acl_mode = default_privileges
1774 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1775 tx.set_default_privilege(
1776 privilege_object.role_id,
1777 privilege_object.database_id,
1778 privilege_object.schema_id,
1779 privilege_object.object_type,
1780 privilege_acl_item.grantee,
1781 new_acl_mode.cloned(),
1782 )?;
1783 CatalogState::add_to_audit_log(
1784 &state.system_configuration,
1785 oracle_write_ts,
1786 session,
1787 tx,
1788 audit_events,
1789 variant.into(),
1790 object_type_to_audit_object_type(privilege_object.object_type),
1791 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1792 role_id: privilege_object.role_id.to_string(),
1793 database_id: privilege_object.database_id.map(|id| id.to_string()),
1794 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1795 grantee_id: privilege_acl_item.grantee.to_string(),
1796 privileges: privilege_acl_item.acl_mode.to_string(),
1797 }),
1798 )?;
1799 }
1800 Op::RenameCluster {
1801 id,
1802 name,
1803 to_name,
1804 check_reserved_names,
1805 } => {
1806 if id.is_system() {
1807 return Err(AdapterError::Catalog(Error::new(
1808 ErrorKind::ReadOnlyCluster(name.clone()),
1809 )));
1810 }
1811 if check_reserved_names && is_reserved_name(&to_name) {
1812 return Err(AdapterError::Catalog(Error::new(
1813 ErrorKind::ReservedClusterName(to_name),
1814 )));
1815 }
1816 tx.rename_cluster(id, &name, &to_name)?;
1817 CatalogState::add_to_audit_log(
1818 &state.system_configuration,
1819 oracle_write_ts,
1820 session,
1821 tx,
1822 audit_events,
1823 EventType::Alter,
1824 ObjectType::Cluster,
1825 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1826 id: id.to_string(),
1827 old_name: name.clone(),
1828 new_name: to_name.clone(),
1829 }),
1830 )?;
1831 info!("rename cluster {name} to {to_name}");
1832 }
1833 Op::RenameClusterReplica {
1834 cluster_id,
1835 replica_id,
1836 name,
1837 to_name,
1838 } => {
1839 if is_reserved_name(&to_name) {
1840 return Err(AdapterError::Catalog(Error::new(
1841 ErrorKind::ReservedReplicaName(to_name),
1842 )));
1843 }
1844 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1845 CatalogState::add_to_audit_log(
1846 &state.system_configuration,
1847 oracle_write_ts,
1848 session,
1849 tx,
1850 audit_events,
1851 EventType::Alter,
1852 ObjectType::ClusterReplica,
1853 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1854 cluster_id: cluster_id.to_string(),
1855 replica_id: replica_id.to_string(),
1856 old_name: name.replica.as_str().to_string(),
1857 new_name: to_name.clone(),
1858 }),
1859 )?;
1860 info!("rename cluster replica {name} to {to_name}");
1861 }
1862 Op::RenameItem {
1863 id,
1864 to_name,
1865 current_full_name,
1866 } => {
1867 let mut updates = Vec::new();
1868
1869 let entry = state.get_entry(&id);
1870 if let CatalogItem::Type(_) = entry.item() {
1871 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1872 current_full_name.to_string(),
1873 ))));
1874 }
1875
1876 if entry.id().is_system() {
1877 let name = state
1878 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1879 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1880 name.to_string(),
1881 ))));
1882 }
1883
1884 let mut to_full_name = current_full_name.clone();
1885 to_full_name.item.clone_from(&to_name);
1886
1887 let mut to_qualified_name = entry.name().clone();
1888 to_qualified_name.item.clone_from(&to_name);
1889
1890 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1891 id: id.to_string(),
1892 old_name: Self::full_name_detail(¤t_full_name),
1893 new_name: Self::full_name_detail(&to_full_name),
1894 });
1895 if Self::should_audit_log_item(entry.item()) {
1896 CatalogState::add_to_audit_log(
1897 &state.system_configuration,
1898 oracle_write_ts,
1899 session,
1900 tx,
1901 audit_events,
1902 EventType::Alter,
1903 catalog_type_to_audit_object_type(entry.item().typ()),
1904 details,
1905 )?;
1906 }
1907
1908 let mut new_entry = entry.clone();
1910 new_entry.name.item.clone_from(&to_name);
1911 new_entry.item = entry
1912 .item()
1913 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1914 .map_err(|e| {
1915 Error::new(ErrorKind::from(AmbiguousRename {
1916 depender: state
1917 .resolve_full_name(entry.name(), entry.conn_id())
1918 .to_string(),
1919 dependee: state
1920 .resolve_full_name(entry.name(), entry.conn_id())
1921 .to_string(),
1922 message: e,
1923 }))
1924 })?;
1925
1926 for id in entry.referenced_by() {
1927 let dependent_item = state.get_entry(id);
1928 let mut to_entry = dependent_item.clone();
1929 to_entry.item = dependent_item
1930 .item()
1931 .rename_item_refs(
1932 current_full_name.clone(),
1933 to_full_name.item.clone(),
1934 false,
1935 )
1936 .map_err(|e| {
1937 Error::new(ErrorKind::from(AmbiguousRename {
1938 depender: state
1939 .resolve_full_name(
1940 dependent_item.name(),
1941 dependent_item.conn_id(),
1942 )
1943 .to_string(),
1944 dependee: state
1945 .resolve_full_name(entry.name(), entry.conn_id())
1946 .to_string(),
1947 message: e,
1948 }))
1949 })?;
1950
1951 if !to_entry.item().is_temporary() {
1952 tx.update_item(*id, to_entry.into())?;
1953 } else {
1954 temporary_item_updates
1955 .push((dependent_item.clone().into(), StateDiff::Retraction));
1956 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1957 }
1958 updates.push(*id);
1959 }
1960 if !new_entry.item().is_temporary() {
1961 tx.update_item(id, new_entry.into())?;
1962 } else {
1963 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1964 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1965 }
1966
1967 updates.push(id);
1968 for id in updates {
1969 Self::log_update(state, &id);
1970 }
1971 }
1972 Op::RenameSchema {
1973 database_spec,
1974 schema_spec,
1975 new_name,
1976 check_reserved_names,
1977 } => {
1978 if check_reserved_names && is_reserved_name(&new_name) {
1979 return Err(AdapterError::Catalog(Error::new(
1980 ErrorKind::ReservedSchemaName(new_name),
1981 )));
1982 }
1983
1984 let conn_id = session
1985 .map(|session| session.conn_id())
1986 .unwrap_or(&SYSTEM_CONN_ID);
1987
1988 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1989 let cur_name = schema.name().schema.clone();
1990
1991 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1992 return Err(AdapterError::Catalog(Error::new(
1993 ErrorKind::AmbientSchemaRename(cur_name),
1994 )));
1995 };
1996 let database = state.get_database(&database_id);
1997 let database_name = &database.name;
1998
1999 let mut updates = Vec::new();
2000 let mut items_to_update = BTreeMap::new();
2001
2002 let mut update_item = |id| {
2003 if items_to_update.contains_key(id) {
2004 return Ok(());
2005 }
2006
2007 let entry = state.get_entry(id);
2008
2009 let mut new_entry = entry.clone();
2011 new_entry.item = entry
2012 .item
2013 .rename_schema_refs(database_name, &cur_name, &new_name)
2014 .map_err(|(s, _i)| {
2015 Error::new(ErrorKind::from(AmbiguousRename {
2016 depender: state
2017 .resolve_full_name(entry.name(), entry.conn_id())
2018 .to_string(),
2019 dependee: format!("{database_name}.{cur_name}"),
2020 message: format!("ambiguous reference to schema named {s}"),
2021 }))
2022 })?;
2023
2024 if !new_entry.item().is_temporary() {
2026 items_to_update.insert(*id, new_entry.into());
2027 } else {
2028 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2029 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2030 }
2031 updates.push(id);
2032
2033 Ok::<_, AdapterError>(())
2034 };
2035
2036 for (_name, item_id) in &schema.items {
2038 update_item(item_id)?;
2040
2041 for id in state.get_entry(item_id).referenced_by() {
2043 update_item(id)?;
2044 }
2045 }
2046 tx.update_items(items_to_update)?;
2049
2050 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2052 let schema_name = schema.name().schema.clone();
2053 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2054 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2055 )));
2056 };
2057
2058 let database_name = database_spec
2060 .id()
2061 .map(|id| state.get_database(&id).name.clone());
2062 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2063 id: schema_id.to_string(),
2064 old_name: schema.name().schema.clone(),
2065 new_name: new_name.clone(),
2066 database_name,
2067 });
2068 CatalogState::add_to_audit_log(
2069 &state.system_configuration,
2070 oracle_write_ts,
2071 session,
2072 tx,
2073 audit_events,
2074 EventType::Alter,
2075 mz_audit_log::ObjectType::Schema,
2076 details,
2077 )?;
2078
2079 let mut new_schema = schema.clone();
2081 new_schema.name.schema.clone_from(&new_name);
2082 tx.update_schema(schema_id, new_schema.into())?;
2083
2084 for id in updates {
2085 Self::log_update(state, id);
2086 }
2087 }
2088 Op::UpdateOwner { id, new_owner } => {
2089 let conn_id = session
2090 .map(|session| session.conn_id())
2091 .unwrap_or(&SYSTEM_CONN_ID);
2092 let old_owner = state
2093 .get_owner_id(&id, conn_id)
2094 .expect("cannot update the owner of an object without an owner");
2095 match &id {
2096 ObjectId::Cluster(id) => {
2097 let mut cluster = state.get_cluster(*id).clone();
2098 if id.is_system() {
2099 return Err(AdapterError::Catalog(Error::new(
2100 ErrorKind::ReadOnlyCluster(cluster.name),
2101 )));
2102 }
2103 Self::update_privilege_owners(
2104 &mut cluster.privileges,
2105 cluster.owner_id,
2106 new_owner,
2107 );
2108 cluster.owner_id = new_owner;
2109 tx.update_cluster(*id, cluster.into())?;
2110 }
2111 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2112 let cluster = state.get_cluster(*cluster_id);
2113 let mut replica = cluster
2114 .replica(*replica_id)
2115 .expect("catalog out of sync")
2116 .clone();
2117 if replica_id.is_system() {
2118 return Err(AdapterError::Catalog(Error::new(
2119 ErrorKind::ReadOnlyClusterReplica(replica.name),
2120 )));
2121 }
2122 replica.owner_id = new_owner;
2123 tx.update_cluster_replica(*replica_id, replica.into())?;
2124 }
2125 ObjectId::Database(id) => {
2126 let mut database = state.get_database(id).clone();
2127 if id.is_system() {
2128 return Err(AdapterError::Catalog(Error::new(
2129 ErrorKind::ReadOnlyDatabase(database.name),
2130 )));
2131 }
2132 Self::update_privilege_owners(
2133 &mut database.privileges,
2134 database.owner_id,
2135 new_owner,
2136 );
2137 database.owner_id = new_owner;
2138 tx.update_database(*id, database.clone().into())?;
2139 }
2140 ObjectId::Schema((database_spec, schema_spec)) => {
2141 let schema_id: SchemaId = schema_spec.clone().into();
2142 let mut schema = state
2143 .get_schema(database_spec, schema_spec, conn_id)
2144 .clone();
2145 if schema_id.is_system() {
2146 let name = schema.name();
2147 let full_name = state.resolve_full_schema_name(name);
2148 return Err(AdapterError::Catalog(Error::new(
2149 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2150 )));
2151 }
2152 Self::update_privilege_owners(
2153 &mut schema.privileges,
2154 schema.owner_id,
2155 new_owner,
2156 );
2157 schema.owner_id = new_owner;
2158 tx.update_schema(schema_id, schema.into())?;
2159 }
2160 ObjectId::Item(id) => {
2161 let entry = state.get_entry(id);
2162 let mut new_entry = entry.clone();
2163 if id.is_system() {
2164 let full_name = state.resolve_full_name(
2165 new_entry.name(),
2166 session.map(|session| session.conn_id()),
2167 );
2168 return Err(AdapterError::Catalog(Error::new(
2169 ErrorKind::ReadOnlyItem(full_name.to_string()),
2170 )));
2171 }
2172 Self::update_privilege_owners(
2173 &mut new_entry.privileges,
2174 new_entry.owner_id,
2175 new_owner,
2176 );
2177 new_entry.owner_id = new_owner;
2178 if !new_entry.item().is_temporary() {
2179 tx.update_item(*id, new_entry.into())?;
2180 } else {
2181 temporary_item_updates
2182 .push((entry.clone().into(), StateDiff::Retraction));
2183 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2184 }
2185 }
2186 ObjectId::NetworkPolicy(id) => {
2187 let mut policy = state.get_network_policy(id).clone();
2188 if id.is_system() {
2189 return Err(AdapterError::Catalog(Error::new(
2190 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2191 )));
2192 }
2193 Self::update_privilege_owners(
2194 &mut policy.privileges,
2195 policy.owner_id,
2196 new_owner,
2197 );
2198 policy.owner_id = new_owner;
2199 tx.update_network_policy(*id, policy.into())?;
2200 }
2201 ObjectId::Role(_) => unreachable!("roles have no owner"),
2202 }
2203 let object_type = state.get_object_type(&id);
2204 CatalogState::add_to_audit_log(
2205 &state.system_configuration,
2206 oracle_write_ts,
2207 session,
2208 tx,
2209 audit_events,
2210 EventType::Alter,
2211 object_type_to_audit_object_type(object_type),
2212 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2213 object_id: id.to_string(),
2214 old_owner_id: old_owner.to_string(),
2215 new_owner_id: new_owner.to_string(),
2216 }),
2217 )?;
2218 }
2219 Op::UpdateClusterConfig { id, name, config } => {
2220 let mut cluster = state.get_cluster(id).clone();
2221 cluster.config = config;
2222 tx.update_cluster(id, cluster.into())?;
2223 info!("update cluster {}", name);
2224
2225 CatalogState::add_to_audit_log(
2226 &state.system_configuration,
2227 oracle_write_ts,
2228 session,
2229 tx,
2230 audit_events,
2231 EventType::Alter,
2232 ObjectType::Cluster,
2233 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2234 id: id.to_string(),
2235 name,
2236 }),
2237 )?;
2238 }
2239 Op::UpdateClusterReplicaConfig {
2240 replica_id,
2241 cluster_id,
2242 config,
2243 } => {
2244 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2245 info!("update replica {}", replica.name);
2246 tx.update_cluster_replica(
2247 replica_id,
2248 mz_catalog::durable::ClusterReplica {
2249 cluster_id,
2250 replica_id,
2251 name: replica.name.clone(),
2252 config: config.clone().into(),
2253 owner_id: replica.owner_id,
2254 },
2255 )?;
2256 }
2257 Op::UpdateItem { id, name, to_item } => {
2258 let mut entry = state.get_entry(&id).clone();
2259 entry.name = name.clone();
2260 entry.item = to_item.clone();
2261 tx.update_item(id, entry.into())?;
2262
2263 if Self::should_audit_log_item(&to_item) {
2264 let mut full_name = Self::full_name_detail(
2265 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2266 );
2267 full_name.item = name.item;
2268
2269 CatalogState::add_to_audit_log(
2270 &state.system_configuration,
2271 oracle_write_ts,
2272 session,
2273 tx,
2274 audit_events,
2275 EventType::Alter,
2276 catalog_type_to_audit_object_type(to_item.typ()),
2277 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2278 id: id.to_string(),
2279 name: full_name,
2280 }),
2281 )?;
2282 }
2283
2284 Self::log_update(state, &id);
2285 }
2286 Op::UpdateSystemConfiguration { name, value } => {
2287 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2288 tx.upsert_system_config(&name, parsed_value.clone())?;
2289 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2294 let with_0dt_deployment_max_wait =
2295 Duration::parse(VarInput::Flat(&parsed_value))
2296 .expect("parsing succeeded above");
2297 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2298 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2299 let with_0dt_deployment_ddl_check_interval =
2300 Duration::parse(VarInput::Flat(&parsed_value))
2301 .expect("parsing succeeded above");
2302 tx.set_0dt_deployment_ddl_check_interval(
2303 with_0dt_deployment_ddl_check_interval,
2304 )?;
2305 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2306 let panic_after_timeout =
2307 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2308 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2309 }
2310
2311 CatalogState::add_to_audit_log(
2312 &state.system_configuration,
2313 oracle_write_ts,
2314 session,
2315 tx,
2316 audit_events,
2317 EventType::Alter,
2318 ObjectType::System,
2319 EventDetails::SetV1(mz_audit_log::SetV1 {
2320 name,
2321 value: Some(value.borrow().to_vec().join(", ")),
2322 }),
2323 )?;
2324 }
2325 Op::ResetSystemConfiguration { name } => {
2326 tx.remove_system_config(&name);
2327 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2332 tx.reset_0dt_deployment_max_wait()?;
2333 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2334 tx.reset_0dt_deployment_ddl_check_interval()?;
2335 }
2336
2337 CatalogState::add_to_audit_log(
2338 &state.system_configuration,
2339 oracle_write_ts,
2340 session,
2341 tx,
2342 audit_events,
2343 EventType::Alter,
2344 ObjectType::System,
2345 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2346 )?;
2347 }
2348 Op::ResetAllSystemConfiguration => {
2349 tx.clear_system_configs();
2350 tx.reset_0dt_deployment_max_wait()?;
2351 tx.reset_0dt_deployment_ddl_check_interval()?;
2352
2353 CatalogState::add_to_audit_log(
2354 &state.system_configuration,
2355 oracle_write_ts,
2356 session,
2357 tx,
2358 audit_events,
2359 EventType::Alter,
2360 ObjectType::System,
2361 EventDetails::ResetAllV1,
2362 )?;
2363 }
2364 Op::WeirdStorageUsageUpdates {
2365 object_id,
2366 size_bytes,
2367 collection_timestamp,
2368 } => {
2369 let id = tx.allocate_storage_usage_ids()?;
2370 let metric =
2371 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2372 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2373 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2374 weird_builtin_table_update = Some(builtin_table_update);
2375 }
2376 };
2377 Ok((weird_builtin_table_update, temporary_item_updates))
2378 }
2379
2380 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2381 let entry = state.get_entry(id);
2382 info!(
2383 "update {} {} ({})",
2384 entry.item_type(),
2385 state.resolve_full_name(entry.name(), entry.conn_id()),
2386 id
2387 );
2388 }
2389
2390 fn update_privilege_owners(
2394 privileges: &mut PrivilegeMap,
2395 old_owner: RoleId,
2396 new_owner: RoleId,
2397 ) {
2398 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2400
2401 let mut new_present = false;
2402 for privilege in flat_privileges.iter_mut() {
2403 if privilege.grantor == old_owner {
2406 privilege.grantor = new_owner;
2407 } else if privilege.grantor == new_owner {
2408 new_present = true;
2409 }
2410 if privilege.grantee == old_owner {
2412 privilege.grantee = new_owner;
2413 } else if privilege.grantee == new_owner {
2414 new_present = true;
2415 }
2416 }
2417
2418 if new_present {
2422 let privilege_map: BTreeMap<_, Vec<_>> =
2424 flat_privileges
2425 .into_iter()
2426 .fold(BTreeMap::new(), |mut accum, privilege| {
2427 accum
2428 .entry((privilege.grantee, privilege.grantor))
2429 .or_default()
2430 .push(privilege);
2431 accum
2432 });
2433
2434 flat_privileges = privilege_map
2436 .into_iter()
2437 .map(|((grantee, grantor), values)|
2438 values.into_iter().fold(
2440 MzAclItem::empty(grantee, grantor),
2441 |mut accum, mz_aclitem| {
2442 accum.acl_mode =
2443 accum.acl_mode.union(mz_aclitem.acl_mode);
2444 accum
2445 },
2446 ))
2447 .collect();
2448 }
2449
2450 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2451 }
2452}
2453
2454#[derive(Debug, Default)]
2462pub(crate) struct ObjectsToDrop {
2463 pub comments: BTreeSet<CommentObjectId>,
2464 pub databases: BTreeSet<DatabaseId>,
2465 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2466 pub clusters: BTreeSet<ClusterId>,
2467 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2468 pub roles: BTreeSet<RoleId>,
2469 pub items: Vec<CatalogItemId>,
2470 pub network_policies: BTreeSet<NetworkPolicyId>,
2471}
2472
2473impl ObjectsToDrop {
2474 pub fn generate(
2475 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2476 state: &CatalogState,
2477 session: Option<&ConnMeta>,
2478 ) -> Result<Self, AdapterError> {
2479 let mut delta = ObjectsToDrop::default();
2480
2481 for drop_object_info in drop_object_infos {
2482 delta.add_item(drop_object_info, state, session)?;
2483 }
2484
2485 Ok(delta)
2486 }
2487
2488 fn add_item(
2489 &mut self,
2490 drop_object_info: DropObjectInfo,
2491 state: &CatalogState,
2492 session: Option<&ConnMeta>,
2493 ) -> Result<(), AdapterError> {
2494 self.comments
2495 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2496
2497 match drop_object_info {
2498 DropObjectInfo::Database(database_id) => {
2499 let database = &state.database_by_id[&database_id];
2500 if database_id.is_system() {
2501 return Err(AdapterError::Catalog(Error::new(
2502 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2503 )));
2504 }
2505
2506 self.databases.insert(database_id);
2507 }
2508 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2509 let schema = state.get_schema(
2510 &database_spec,
2511 &schema_spec,
2512 session
2513 .map(|session| session.conn_id())
2514 .unwrap_or(&SYSTEM_CONN_ID),
2515 );
2516 let schema_id: SchemaId = schema_spec.into();
2517 if schema_id.is_system() {
2518 let name = schema.name();
2519 let full_name = state.resolve_full_schema_name(name);
2520 return Err(AdapterError::Catalog(Error::new(
2521 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2522 )));
2523 }
2524
2525 self.schemas.insert(schema_spec, database_spec);
2526 }
2527 DropObjectInfo::Role(role_id) => {
2528 let name = state.get_role(&role_id).name().to_string();
2529 if role_id.is_system() || role_id.is_predefined() {
2530 return Err(AdapterError::Catalog(Error::new(
2531 ErrorKind::ReservedRoleName(name.clone()),
2532 )));
2533 }
2534 state.ensure_not_reserved_role(&role_id)?;
2535
2536 self.roles.insert(role_id);
2537 }
2538 DropObjectInfo::Cluster(cluster_id) => {
2539 let cluster = state.get_cluster(cluster_id);
2540 let name = &cluster.name;
2541 if cluster_id.is_system() {
2542 return Err(AdapterError::Catalog(Error::new(
2543 ErrorKind::ReadOnlyCluster(name.clone()),
2544 )));
2545 }
2546
2547 self.clusters.insert(cluster_id);
2548 }
2549 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2550 let cluster = state.get_cluster(cluster_id);
2551 let replica = cluster.replica(replica_id).expect("Must exist");
2552
2553 self.replicas
2554 .insert(replica.replica_id, (cluster.id, reason));
2555 }
2556 DropObjectInfo::Item(item_id) => {
2557 let entry = state.get_entry(&item_id);
2558 if item_id.is_system() {
2559 let name = entry.name();
2560 let full_name =
2561 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2562 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2563 full_name.to_string(),
2564 ))));
2565 }
2566
2567 self.items.push(item_id);
2568 }
2569 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2570 let policy = state.get_network_policy(&network_policy_id);
2571 let name = &policy.name;
2572 if network_policy_id.is_system() {
2573 return Err(AdapterError::Catalog(Error::new(
2574 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2575 )));
2576 }
2577
2578 self.network_policies.insert(network_policy_id);
2579 }
2580 }
2581
2582 Ok(())
2583 }
2584}
2585
2586#[cfg(test)]
2587mod tests {
2588 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2589 use mz_repr::role_id::RoleId;
2590
2591 use crate::catalog::Catalog;
2592
2593 #[mz_ore::test]
2594 fn test_update_privilege_owners() {
2595 let old_owner = RoleId::User(1);
2596 let new_owner = RoleId::User(2);
2597 let other_role = RoleId::User(3);
2598
2599 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2601 MzAclItem {
2602 grantee: other_role,
2603 grantor: old_owner,
2604 acl_mode: AclMode::UPDATE,
2605 },
2606 MzAclItem {
2607 grantee: other_role,
2608 grantor: new_owner,
2609 acl_mode: AclMode::SELECT,
2610 },
2611 ]);
2612 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2613 assert_eq!(1, privileges.all_values().count());
2614 assert_eq!(
2615 vec![MzAclItem {
2616 grantee: other_role,
2617 grantor: new_owner,
2618 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2619 }],
2620 privileges.all_values_owned().collect::<Vec<_>>()
2621 );
2622
2623 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2625 MzAclItem {
2626 grantee: old_owner,
2627 grantor: other_role,
2628 acl_mode: AclMode::UPDATE,
2629 },
2630 MzAclItem {
2631 grantee: new_owner,
2632 grantor: other_role,
2633 acl_mode: AclMode::SELECT,
2634 },
2635 ]);
2636 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2637 assert_eq!(1, privileges.all_values().count());
2638 assert_eq!(
2639 vec![MzAclItem {
2640 grantee: new_owner,
2641 grantor: other_role,
2642 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2643 }],
2644 privileges.all_values_owned().collect::<Vec<_>>()
2645 );
2646
2647 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2649 MzAclItem {
2650 grantee: old_owner,
2651 grantor: old_owner,
2652 acl_mode: AclMode::UPDATE,
2653 },
2654 MzAclItem {
2655 grantee: new_owner,
2656 grantor: new_owner,
2657 acl_mode: AclMode::SELECT,
2658 },
2659 ]);
2660 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2661 assert_eq!(1, privileges.all_values().count());
2662 assert_eq!(
2663 vec![MzAclItem {
2664 grantee: new_owner,
2665 grantor: new_owner,
2666 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2667 }],
2668 privileges.all_values_owned().collect::<Vec<_>>()
2669 );
2670 }
2671}