1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff,
34 StateUpdate, StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50 RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
69 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
70 is_reserved_role_name, object_type_to_audit_object_type,
71 system_object_type_to_audit_object_type,
72};
73use crate::coord::ConnMeta;
74use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
75use crate::coord::cluster_scheduling::SchedulingDecision;
76use crate::util::ResultExt;
77
78#[derive(Debug, Clone)]
79pub enum Op {
80 AlterRetainHistory {
81 id: CatalogItemId,
82 value: Option<Value>,
83 window: CompactionWindow,
84 },
85 AlterRole {
86 id: RoleId,
87 name: String,
88 attributes: RoleAttributesRaw,
89 nopassword: bool,
90 vars: RoleVars,
91 },
92 AlterNetworkPolicy {
93 id: NetworkPolicyId,
94 rules: Vec<NetworkPolicyRule>,
95 name: String,
96 owner_id: RoleId,
97 },
98 AlterAddColumn {
99 id: CatalogItemId,
100 new_global_id: GlobalId,
101 name: ColumnName,
102 typ: SqlColumnType,
103 sql: RawDataType,
104 },
105 AlterMaterializedViewApplyReplacement {
106 id: CatalogItemId,
107 replacement_id: CatalogItemId,
108 },
109 CreateDatabase {
110 name: String,
111 owner_id: RoleId,
112 },
113 CreateSchema {
114 database_id: ResolvedDatabaseSpecifier,
115 schema_name: String,
116 owner_id: RoleId,
117 },
118 CreateRole {
119 name: String,
120 attributes: RoleAttributesRaw,
121 },
122 CreateCluster {
123 id: ClusterId,
124 name: String,
125 introspection_sources: Vec<&'static BuiltinLog>,
126 owner_id: RoleId,
127 config: ClusterConfig,
128 },
129 CreateClusterReplica {
130 cluster_id: ClusterId,
131 name: String,
132 config: ReplicaConfig,
133 owner_id: RoleId,
134 reason: ReplicaCreateDropReason,
135 },
136 CreateItem {
137 id: CatalogItemId,
138 name: QualifiedItemName,
139 item: CatalogItem,
140 owner_id: RoleId,
141 },
142 CreateNetworkPolicy {
143 rules: Vec<NetworkPolicyRule>,
144 name: String,
145 owner_id: RoleId,
146 },
147 Comment {
148 object_id: CommentObjectId,
149 sub_component: Option<usize>,
150 comment: Option<String>,
151 },
152 DropObjects(Vec<DropObjectInfo>),
153 GrantRole {
154 role_id: RoleId,
155 member_id: RoleId,
156 grantor_id: RoleId,
157 },
158 RenameCluster {
159 id: ClusterId,
160 name: String,
161 to_name: String,
162 check_reserved_names: bool,
163 },
164 RenameClusterReplica {
165 cluster_id: ClusterId,
166 replica_id: ReplicaId,
167 name: QualifiedReplica,
168 to_name: String,
169 },
170 RenameItem {
171 id: CatalogItemId,
172 current_full_name: FullItemName,
173 to_name: String,
174 },
175 RenameSchema {
176 database_spec: ResolvedDatabaseSpecifier,
177 schema_spec: SchemaSpecifier,
178 new_name: String,
179 check_reserved_names: bool,
180 },
181 UpdateOwner {
182 id: ObjectId,
183 new_owner: RoleId,
184 },
185 UpdatePrivilege {
186 target_id: SystemObjectId,
187 privilege: MzAclItem,
188 variant: UpdatePrivilegeVariant,
189 },
190 UpdateDefaultPrivilege {
191 privilege_object: DefaultPrivilegeObject,
192 privilege_acl_item: DefaultPrivilegeAclItem,
193 variant: UpdatePrivilegeVariant,
194 },
195 RevokeRole {
196 role_id: RoleId,
197 member_id: RoleId,
198 grantor_id: RoleId,
199 },
200 UpdateClusterConfig {
201 id: ClusterId,
202 name: String,
203 config: ClusterConfig,
204 },
205 UpdateClusterReplicaConfig {
206 cluster_id: ClusterId,
207 replica_id: ReplicaId,
208 config: ReplicaConfig,
209 },
210 UpdateItem {
211 id: CatalogItemId,
212 name: QualifiedItemName,
213 to_item: CatalogItem,
214 },
215 UpdateSourceReferences {
216 source_id: CatalogItemId,
217 references: SourceReferences,
218 },
219 UpdateSystemConfiguration {
220 name: String,
221 value: OwnedVarInput,
222 },
223 ResetSystemConfiguration {
224 name: String,
225 },
226 ResetAllSystemConfiguration,
227 WeirdStorageUsageUpdates {
234 object_id: Option<String>,
235 size_bytes: u64,
236 collection_timestamp: EpochMillis,
237 },
238 TransactionDryRun,
244}
245
246#[derive(Debug, Clone)]
250pub enum DropObjectInfo {
251 Cluster(ClusterId),
252 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
253 Database(DatabaseId),
254 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
255 Role(RoleId),
256 Item(CatalogItemId),
257 NetworkPolicy(NetworkPolicyId),
258}
259
260impl DropObjectInfo {
261 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
264 match id {
265 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
266 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
267 cluster_id,
268 replica_id,
269 ReplicaCreateDropReason::Manual,
270 )),
271 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
272 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
273 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
274 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
275 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
276 }
277 }
278
279 fn to_object_id(&self) -> ObjectId {
282 match &self {
283 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
284 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
285 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
286 }
287 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
288 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
289 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
290 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
291 DropObjectInfo::NetworkPolicy(network_policy_id) => {
292 ObjectId::NetworkPolicy(network_policy_id.clone())
293 }
294 }
295 }
296}
297
298#[derive(Debug, Clone)]
300pub enum ReplicaCreateDropReason {
301 Manual,
306 ClusterScheduling(Vec<SchedulingDecision>),
309}
310
311impl ReplicaCreateDropReason {
312 pub fn into_audit_log(
313 self,
314 ) -> (
315 CreateOrDropClusterReplicaReasonV1,
316 Option<SchedulingDecisionsWithReasonsV2>,
317 ) {
318 let (reason, scheduling_policies) = match self {
319 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
320 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
321 CreateOrDropClusterReplicaReasonV1::Schedule,
322 Some(scheduling_decisions),
323 ),
324 };
325 (
326 reason,
327 scheduling_policies
328 .as_ref()
329 .map(SchedulingDecision::reasons_to_audit_log_reasons),
330 )
331 }
332}
333
334pub struct TransactionResult {
335 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
336 pub catalog_updates: Vec<ParsedStateUpdate>,
338 pub audit_events: Vec<VersionedEvent>,
339}
340
341impl Catalog {
342 fn should_audit_log_item(item: &CatalogItem) -> bool {
343 !item.is_temporary()
344 }
345
346 fn temporary_ids(
349 &self,
350 ops: &[Op],
351 temporary_drops: BTreeSet<(&ConnectionId, String)>,
352 ) -> Result<BTreeSet<CatalogItemId>, Error> {
353 let mut creating = BTreeSet::new();
354 let mut temporary_ids = BTreeSet::new();
355 for op in ops.iter() {
356 if let Op::CreateItem {
357 id,
358 name,
359 item,
360 owner_id: _,
361 } = op
362 {
363 if let Some(conn_id) = item.conn_id() {
364 if self.item_exists_in_temp_schemas(conn_id, &name.item)
365 && !temporary_drops.contains(&(conn_id, name.item.clone()))
366 || creating.contains(&(conn_id, &name.item))
367 {
368 return Err(
369 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
370 );
371 } else {
372 creating.insert((conn_id, &name.item));
373 temporary_ids.insert(id.clone());
374 }
375 }
376 }
377 }
378 Ok(temporary_ids)
379 }
380
381 #[instrument(name = "catalog::transact")]
382 pub async fn transact(
383 &mut self,
384 storage_collections: Option<
387 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
388 >,
389 oracle_write_ts: mz_repr::Timestamp,
390 session: Option<&ConnMeta>,
391 ops: Vec<Op>,
392 ) -> Result<TransactionResult, AdapterError> {
393 trace!("transact: {:?}", ops);
394 fail::fail_point!("catalog_transact", |arg| {
395 Err(AdapterError::Unstructured(anyhow::anyhow!(
396 "failpoint: {arg:?}"
397 )))
398 });
399
400 let drop_ids: BTreeSet<CatalogItemId> = ops
401 .iter()
402 .filter_map(|op| match op {
403 Op::DropObjects(drop_object_infos) => {
404 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
405 let item_ids = ids.filter_map(|id| match id {
406 ObjectId::Item(id) => Some(id),
407 _ => None,
408 });
409 Some(item_ids)
410 }
411 _ => None,
412 })
413 .flatten()
414 .collect();
415 let temporary_drops = drop_ids
416 .iter()
417 .filter_map(|id| {
418 let entry = self.get_entry(id);
419 match entry.item().conn_id() {
420 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
421 None => None,
422 }
423 })
424 .collect();
425 let dropped_global_ids = drop_ids
426 .iter()
427 .flat_map(|item_id| self.get_global_ids(item_id))
428 .collect();
429
430 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
431 let mut builtin_table_updates = vec![];
432 let mut catalog_updates = vec![];
433 let mut audit_events = vec![];
434 let mut storage = self.storage().await;
435 let mut tx = storage
436 .transaction()
437 .await
438 .unwrap_or_terminate("starting catalog transaction");
439
440 let new_state = Self::transact_inner(
441 storage_collections,
442 oracle_write_ts,
443 session,
444 ops,
445 temporary_ids,
446 &mut builtin_table_updates,
447 &mut catalog_updates,
448 &mut audit_events,
449 &mut tx,
450 &self.state,
451 )
452 .await?;
453
454 tx.commit(oracle_write_ts)
459 .await
460 .unwrap_or_terminate("catalog storage transaction commit must succeed");
461
462 drop(storage);
465 if let Some(new_state) = new_state {
466 self.transient_revision += 1;
467 self.state = new_state;
468 }
469
470 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
472 if self.state.system_config().enable_mz_notices() {
473 self.state().pack_optimizer_notices(
475 &mut builtin_table_updates,
476 dropped_notices.iter(),
477 Diff::MINUS_ONE,
478 );
479 }
480
481 Ok(TransactionResult {
482 builtin_table_updates,
483 catalog_updates,
484 audit_events,
485 })
486 }
487
488 #[instrument(name = "catalog::transact_inner")]
496 async fn transact_inner(
497 storage_collections: Option<
498 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
499 >,
500 oracle_write_ts: mz_repr::Timestamp,
501 session: Option<&ConnMeta>,
502 mut ops: Vec<Op>,
503 temporary_ids: BTreeSet<CatalogItemId>,
504 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
505 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
506 audit_events: &mut Vec<VersionedEvent>,
507 tx: &mut Transaction<'_>,
508 state: &CatalogState,
509 ) -> Result<Option<CatalogState>, AdapterError> {
510 let mut preliminary_state = Cow::Borrowed(state);
537
538 let mut state = Cow::Borrowed(state);
540
541 let dry_run_ops = match ops.last() {
542 Some(Op::TransactionDryRun) => {
543 ops.pop();
545 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
546 ops.clone()
547 }
548 Some(_) => vec![],
549 None => return Ok(None),
550 };
551
552 let mut storage_collections_to_create = BTreeSet::new();
553 let mut storage_collections_to_drop = BTreeSet::new();
554 let mut storage_collections_to_register = BTreeMap::new();
555
556 let mut updates = Vec::new();
557
558 for op in ops {
559 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
560 oracle_write_ts,
561 session,
562 op,
563 &temporary_ids,
564 audit_events,
565 tx,
566 &*preliminary_state,
567 &mut storage_collections_to_create,
568 &mut storage_collections_to_drop,
569 &mut storage_collections_to_register,
570 )
571 .await?;
572
573 if let Some(builtin_table_update) = weird_builtin_table_update {
582 builtin_table_updates.push(builtin_table_update);
583 }
584
585 let upper = tx.upper();
590 let temporary_item_updates =
591 temporary_item_updates
592 .into_iter()
593 .map(|(item, diff)| StateUpdate {
594 kind: StateUpdateKind::TemporaryItem(item),
595 ts: upper,
596 diff,
597 });
598
599 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
600 op_updates.extend(temporary_item_updates);
601 if !op_updates.is_empty() {
602 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
603 .to_mut()
604 .apply_updates(op_updates.clone(), &mut LocalExpressionCache::Closed)
605 .await;
606 }
607 updates.append(&mut op_updates);
608 }
609
610 if !updates.is_empty() {
611 let (op_builtin_table_updates, op_catalog_updates) = state
612 .to_mut()
613 .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
614 .await;
615 let op_builtin_table_updates = state
616 .to_mut()
617 .resolve_builtin_table_updates(op_builtin_table_updates);
618 builtin_table_updates.extend(op_builtin_table_updates);
619 parsed_catalog_updates.extend(op_catalog_updates);
620 }
621
622 if dry_run_ops.is_empty() {
623 if let Some(c) = storage_collections {
625 c.prepare_state(
626 tx,
627 storage_collections_to_create,
628 storage_collections_to_drop,
629 storage_collections_to_register,
630 )
631 .await?;
632 }
633
634 let updates = tx.get_and_commit_op_updates();
635 if !updates.is_empty() {
636 let (op_builtin_table_updates, op_catalog_updates) = state
637 .to_mut()
638 .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
639 .await;
640 let op_builtin_table_updates = state
641 .to_mut()
642 .resolve_builtin_table_updates(op_builtin_table_updates);
643 builtin_table_updates.extend(op_builtin_table_updates);
644 parsed_catalog_updates.extend(op_catalog_updates);
645 }
646
647 match state {
648 Cow::Owned(state) => Ok(Some(state)),
649 Cow::Borrowed(_) => Ok(None),
650 }
651 } else {
652 Err(AdapterError::TransactionDryRun {
653 new_ops: dry_run_ops,
654 new_state: state.into_owned(),
655 })
656 }
657 }
658
659 #[instrument]
667 async fn transact_op(
668 oracle_write_ts: mz_repr::Timestamp,
669 session: Option<&ConnMeta>,
670 op: Op,
671 temporary_ids: &BTreeSet<CatalogItemId>,
672 audit_events: &mut Vec<VersionedEvent>,
673 tx: &mut Transaction<'_>,
674 state: &CatalogState,
675 storage_collections_to_create: &mut BTreeSet<GlobalId>,
676 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
677 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
678 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
679 let mut weird_builtin_table_update = None;
680 let mut temporary_item_updates = Vec::new();
681
682 match op {
683 Op::TransactionDryRun => {
684 unreachable!("TransactionDryRun can only be used a final element of ops")
685 }
686 Op::AlterRetainHistory { id, value, window } => {
687 let entry = state.get_entry(&id);
688 if id.is_system() {
689 let name = entry.name();
690 let full_name =
691 state.resolve_full_name(name, session.map(|session| session.conn_id()));
692 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
693 full_name.to_string(),
694 ))));
695 }
696
697 let mut new_entry = entry.clone();
698 let previous = new_entry
699 .item
700 .update_retain_history(value.clone(), window)
701 .map_err(|_| {
702 AdapterError::Catalog(Error::new(ErrorKind::Internal(
703 "planner should have rejected invalid alter retain history item type"
704 .to_string(),
705 )))
706 })?;
707
708 if Self::should_audit_log_item(new_entry.item()) {
709 let details =
710 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
711 id: id.to_string(),
712 old_history: previous.map(|previous| previous.to_string()),
713 new_history: value.map(|v| v.to_string()),
714 });
715 CatalogState::add_to_audit_log(
716 &state.system_configuration,
717 oracle_write_ts,
718 session,
719 tx,
720 audit_events,
721 EventType::Alter,
722 catalog_type_to_audit_object_type(new_entry.item().typ()),
723 details,
724 )?;
725 }
726
727 tx.update_item(id, new_entry.into())?;
728
729 Self::log_update(state, &id);
730 }
731 Op::AlterRole {
732 id,
733 name,
734 attributes,
735 nopassword,
736 vars,
737 } => {
738 state.ensure_not_reserved_role(&id)?;
739
740 let mut existing_role = state.get_role(&id).clone();
741 let password = attributes.password.clone();
742 let scram_iterations = attributes
743 .scram_iterations
744 .unwrap_or_else(|| state.system_config().scram_iterations());
745 existing_role.attributes = attributes.into();
746 existing_role.vars = vars;
747 let password_action = if nopassword {
748 PasswordAction::Clear
749 } else if let Some(password) = password {
750 PasswordAction::Set(PasswordConfig {
751 password,
752 scram_iterations,
753 })
754 } else {
755 PasswordAction::NoChange
756 };
757 tx.update_role(id, existing_role.into(), password_action)?;
758
759 CatalogState::add_to_audit_log(
760 &state.system_configuration,
761 oracle_write_ts,
762 session,
763 tx,
764 audit_events,
765 EventType::Alter,
766 ObjectType::Role,
767 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
768 id: id.to_string(),
769 name: name.clone(),
770 }),
771 )?;
772
773 info!("update role {name} ({id})");
774 }
775 Op::AlterNetworkPolicy {
776 id,
777 rules,
778 name,
779 owner_id: _owner_id,
780 } => {
781 let existing_policy = state.get_network_policy(&id).clone();
782 let mut policy: NetworkPolicy = existing_policy.into();
783 policy.rules = rules;
784 if is_reserved_name(&name) {
785 return Err(AdapterError::Catalog(Error::new(
786 ErrorKind::ReservedNetworkPolicyName(name),
787 )));
788 }
789 tx.update_network_policy(id, policy.clone())?;
790
791 CatalogState::add_to_audit_log(
792 &state.system_configuration,
793 oracle_write_ts,
794 session,
795 tx,
796 audit_events,
797 EventType::Alter,
798 ObjectType::NetworkPolicy,
799 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
800 id: id.to_string(),
801 name: name.clone(),
802 }),
803 )?;
804
805 info!("update network policy {name} ({id})");
806 }
807 Op::AlterAddColumn {
808 id,
809 new_global_id,
810 name,
811 typ,
812 sql,
813 } => {
814 let mut new_entry = state.get_entry(&id).clone();
815 let version = new_entry.item.add_column(name, typ, sql)?;
816 let shard_id = state
819 .storage_metadata()
820 .get_collection_shard(new_entry.latest_global_id())?;
821
822 let CatalogItem::Table(table) = &mut new_entry.item else {
824 return Err(AdapterError::Unsupported("adding columns to non-Table"));
825 };
826 table.collections.insert(version, new_global_id);
827
828 tx.update_item(id, new_entry.into())?;
829 storage_collections_to_register.insert(new_global_id, shard_id);
830 }
831 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
832 let mut new_entry = state.get_entry(&id).clone();
833 let replacement = state.get_entry(&replacement_id);
834
835 let new_audit_events =
836 apply_replacement_audit_events(state, &new_entry, replacement);
837
838 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
839 return Err(AdapterError::internal(
840 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
841 "id must refer to a materialized view",
842 ));
843 };
844 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
845 return Err(AdapterError::internal(
846 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
847 "replacement_id must refer to a materialized view",
848 ));
849 };
850
851 mv.apply_replacement(replacement_mv.clone());
852
853 tx.remove_item(replacement_id)?;
854 tx.update_item(id, new_entry.into())?;
855
856 let comment_id = CommentObjectId::MaterializedView(replacement_id);
857 tx.drop_comments(&[comment_id].into())?;
858
859 for (event_type, details) in new_audit_events {
860 CatalogState::add_to_audit_log(
861 &state.system_configuration,
862 oracle_write_ts,
863 session,
864 tx,
865 audit_events,
866 event_type,
867 ObjectType::MaterializedView,
868 details,
869 )?;
870 }
871 }
872 Op::CreateDatabase { name, owner_id } => {
873 let database_owner_privileges = vec![rbac::owner_privilege(
874 mz_sql::catalog::ObjectType::Database,
875 owner_id,
876 )];
877 let database_default_privileges = state
878 .default_privileges
879 .get_applicable_privileges(
880 owner_id,
881 None,
882 None,
883 mz_sql::catalog::ObjectType::Database,
884 )
885 .map(|item| item.mz_acl_item(owner_id));
886 let database_privileges: Vec<_> = merge_mz_acl_items(
887 database_owner_privileges
888 .into_iter()
889 .chain(database_default_privileges),
890 )
891 .collect();
892
893 let schema_owner_privileges = vec![rbac::owner_privilege(
894 mz_sql::catalog::ObjectType::Schema,
895 owner_id,
896 )];
897 let schema_default_privileges = state
898 .default_privileges
899 .get_applicable_privileges(
900 owner_id,
901 None,
902 None,
903 mz_sql::catalog::ObjectType::Schema,
904 )
905 .map(|item| item.mz_acl_item(owner_id))
906 .chain(std::iter::once(MzAclItem {
908 grantee: RoleId::Public,
909 grantor: owner_id,
910 acl_mode: AclMode::USAGE,
911 }));
912 let schema_privileges: Vec<_> = merge_mz_acl_items(
913 schema_owner_privileges
914 .into_iter()
915 .chain(schema_default_privileges),
916 )
917 .collect();
918
919 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
920 let (database_id, _) = tx.insert_user_database(
921 &name,
922 owner_id,
923 database_privileges.clone(),
924 &temporary_oids,
925 )?;
926 let (schema_id, _) = tx.insert_user_schema(
927 database_id,
928 DEFAULT_SCHEMA,
929 owner_id,
930 schema_privileges.clone(),
931 &temporary_oids,
932 )?;
933 CatalogState::add_to_audit_log(
934 &state.system_configuration,
935 oracle_write_ts,
936 session,
937 tx,
938 audit_events,
939 EventType::Create,
940 ObjectType::Database,
941 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
942 id: database_id.to_string(),
943 name: name.clone(),
944 }),
945 )?;
946 info!("create database {}", name);
947
948 CatalogState::add_to_audit_log(
949 &state.system_configuration,
950 oracle_write_ts,
951 session,
952 tx,
953 audit_events,
954 EventType::Create,
955 ObjectType::Schema,
956 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
957 id: schema_id.to_string(),
958 name: DEFAULT_SCHEMA.to_string(),
959 database_name: Some(name),
960 }),
961 )?;
962 }
963 Op::CreateSchema {
964 database_id,
965 schema_name,
966 owner_id,
967 } => {
968 if is_reserved_name(&schema_name) {
969 return Err(AdapterError::Catalog(Error::new(
970 ErrorKind::ReservedSchemaName(schema_name),
971 )));
972 }
973 let database_id = match database_id {
974 ResolvedDatabaseSpecifier::Id(id) => id,
975 ResolvedDatabaseSpecifier::Ambient => {
976 return Err(AdapterError::Catalog(Error::new(
977 ErrorKind::ReadOnlySystemSchema(schema_name),
978 )));
979 }
980 };
981 let owner_privileges = vec![rbac::owner_privilege(
982 mz_sql::catalog::ObjectType::Schema,
983 owner_id,
984 )];
985 let default_privileges = state
986 .default_privileges
987 .get_applicable_privileges(
988 owner_id,
989 Some(database_id),
990 None,
991 mz_sql::catalog::ObjectType::Schema,
992 )
993 .map(|item| item.mz_acl_item(owner_id));
994 let privileges: Vec<_> =
995 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
996 .collect();
997 let (schema_id, _) = tx.insert_user_schema(
998 database_id,
999 &schema_name,
1000 owner_id,
1001 privileges.clone(),
1002 &state.get_temporary_oids().collect(),
1003 )?;
1004 CatalogState::add_to_audit_log(
1005 &state.system_configuration,
1006 oracle_write_ts,
1007 session,
1008 tx,
1009 audit_events,
1010 EventType::Create,
1011 ObjectType::Schema,
1012 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1013 id: schema_id.to_string(),
1014 name: schema_name.clone(),
1015 database_name: Some(state.database_by_id[&database_id].name.clone()),
1016 }),
1017 )?;
1018 }
1019 Op::CreateRole { name, attributes } => {
1020 if is_reserved_role_name(&name) {
1021 return Err(AdapterError::Catalog(Error::new(
1022 ErrorKind::ReservedRoleName(name),
1023 )));
1024 }
1025 let membership = RoleMembership::new();
1026 let vars = RoleVars::default();
1027 let (id, _) = tx.insert_user_role(
1028 name.clone(),
1029 attributes.clone(),
1030 membership.clone(),
1031 vars.clone(),
1032 &state.get_temporary_oids().collect(),
1033 )?;
1034 CatalogState::add_to_audit_log(
1035 &state.system_configuration,
1036 oracle_write_ts,
1037 session,
1038 tx,
1039 audit_events,
1040 EventType::Create,
1041 ObjectType::Role,
1042 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1043 id: id.to_string(),
1044 name: name.clone(),
1045 }),
1046 )?;
1047 info!("create role {}", name);
1048 }
1049 Op::CreateCluster {
1050 id,
1051 name,
1052 introspection_sources,
1053 owner_id,
1054 config,
1055 } => {
1056 if is_reserved_name(&name) {
1057 return Err(AdapterError::Catalog(Error::new(
1058 ErrorKind::ReservedClusterName(name),
1059 )));
1060 }
1061 let owner_privileges = vec![rbac::owner_privilege(
1062 mz_sql::catalog::ObjectType::Cluster,
1063 owner_id,
1064 )];
1065 let default_privileges = state
1066 .default_privileges
1067 .get_applicable_privileges(
1068 owner_id,
1069 None,
1070 None,
1071 mz_sql::catalog::ObjectType::Cluster,
1072 )
1073 .map(|item| item.mz_acl_item(owner_id));
1074 let privileges: Vec<_> =
1075 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1076 .collect();
1077 let introspection_source_ids: Vec<_> = introspection_sources
1078 .iter()
1079 .map(|introspection_source| {
1080 Transaction::allocate_introspection_source_index_id(
1081 &id,
1082 introspection_source.variant,
1083 )
1084 })
1085 .collect();
1086
1087 let introspection_sources = introspection_sources
1088 .into_iter()
1089 .zip_eq(introspection_source_ids)
1090 .map(|(log, (item_id, gid))| (log, item_id, gid))
1091 .collect();
1092
1093 tx.insert_user_cluster(
1094 id,
1095 &name,
1096 introspection_sources,
1097 owner_id,
1098 privileges.clone(),
1099 config.clone().into(),
1100 &state.get_temporary_oids().collect(),
1101 )?;
1102 CatalogState::add_to_audit_log(
1103 &state.system_configuration,
1104 oracle_write_ts,
1105 session,
1106 tx,
1107 audit_events,
1108 EventType::Create,
1109 ObjectType::Cluster,
1110 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1111 id: id.to_string(),
1112 name: name.clone(),
1113 }),
1114 )?;
1115 info!("create cluster {}", name);
1116 }
1117 Op::CreateClusterReplica {
1118 cluster_id,
1119 name,
1120 config,
1121 owner_id,
1122 reason,
1123 } => {
1124 if is_reserved_name(&name) {
1125 return Err(AdapterError::Catalog(Error::new(
1126 ErrorKind::ReservedReplicaName(name),
1127 )));
1128 }
1129 let cluster = state.get_cluster(cluster_id);
1130 let id =
1131 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1132 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1133 size,
1134 billed_as,
1135 internal,
1136 ..
1137 }) = &config.location
1138 {
1139 let (reason, scheduling_policies) = reason.into_audit_log();
1140 let details = EventDetails::CreateClusterReplicaV4(
1141 mz_audit_log::CreateClusterReplicaV4 {
1142 cluster_id: cluster_id.to_string(),
1143 cluster_name: cluster.name.clone(),
1144 replica_id: Some(id.to_string()),
1145 replica_name: name.clone(),
1146 logical_size: size.clone(),
1147 billed_as: billed_as.clone(),
1148 internal: *internal,
1149 reason,
1150 scheduling_policies,
1151 },
1152 );
1153 CatalogState::add_to_audit_log(
1154 &state.system_configuration,
1155 oracle_write_ts,
1156 session,
1157 tx,
1158 audit_events,
1159 EventType::Create,
1160 ObjectType::ClusterReplica,
1161 details,
1162 )?;
1163 }
1164 }
1165 Op::CreateItem {
1166 id,
1167 name,
1168 item,
1169 owner_id,
1170 } => {
1171 state.check_unstable_dependencies(&item)?;
1172
1173 match &item {
1174 CatalogItem::Table(table) => {
1175 let gids: Vec<_> = table.global_ids().collect();
1176 assert_eq!(gids.len(), 1);
1177 storage_collections_to_create.extend(gids);
1178 }
1179 CatalogItem::Source(source) => {
1180 storage_collections_to_create.insert(source.global_id());
1181 }
1182 CatalogItem::MaterializedView(mv) => {
1183 let mv_gid = mv.global_id_writes();
1184 if let Some(target_id) = mv.replacement_target {
1185 let target_gid = state.get_entry(&target_id).latest_global_id();
1186 let shard_id =
1187 state.storage_metadata().get_collection_shard(target_gid)?;
1188 storage_collections_to_register.insert(mv_gid, shard_id);
1189 } else {
1190 storage_collections_to_create.insert(mv_gid);
1191 }
1192 }
1193 CatalogItem::ContinualTask(ct) => {
1194 storage_collections_to_create.insert(ct.global_id());
1195 }
1196 CatalogItem::Sink(sink) => {
1197 storage_collections_to_create.insert(sink.global_id());
1198 }
1199 CatalogItem::Log(_)
1200 | CatalogItem::View(_)
1201 | CatalogItem::Index(_)
1202 | CatalogItem::Type(_)
1203 | CatalogItem::Func(_)
1204 | CatalogItem::Secret(_)
1205 | CatalogItem::Connection(_) => (),
1206 }
1207
1208 let system_user = session.map_or(false, |s| s.user().is_system_user());
1209 if !system_user {
1210 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1211 let cluster_name = state.clusters_by_id[&id].name.clone();
1212 return Err(AdapterError::Catalog(Error::new(
1213 ErrorKind::ReadOnlyCluster(cluster_name),
1214 )));
1215 }
1216 }
1217
1218 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1219 let default_privileges = state
1220 .default_privileges
1221 .get_applicable_privileges(
1222 owner_id,
1223 name.qualifiers.database_spec.id(),
1224 Some(name.qualifiers.schema_spec.into()),
1225 item.typ().into(),
1226 )
1227 .map(|item| item.mz_acl_item(owner_id));
1228 let progress_source_privilege = if item.is_progress_source() {
1230 Some(MzAclItem {
1231 grantee: MZ_SUPPORT_ROLE_ID,
1232 grantor: owner_id,
1233 acl_mode: AclMode::SELECT,
1234 })
1235 } else {
1236 None
1237 };
1238 let privileges: Vec<_> = merge_mz_acl_items(
1239 owner_privileges
1240 .into_iter()
1241 .chain(default_privileges)
1242 .chain(progress_source_privilege),
1243 )
1244 .collect();
1245
1246 let temporary_oids = state.get_temporary_oids().collect();
1247
1248 if item.is_temporary() {
1249 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1250 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1251 {
1252 return Err(AdapterError::Catalog(Error::new(
1253 ErrorKind::InvalidTemporarySchema,
1254 )));
1255 }
1256 let oid = tx.allocate_oid(&temporary_oids)?;
1257
1258 let schema_id = name.qualifiers.schema_spec.clone().into();
1259 let item_type = item.typ();
1260 let (create_sql, global_id, versions) = item.to_serialized();
1261
1262 let item = TemporaryItem {
1263 id,
1264 oid,
1265 global_id,
1266 schema_id,
1267 name: name.item.clone(),
1268 create_sql,
1269 conn_id: item.conn_id().cloned(),
1270 owner_id,
1271 privileges: privileges.clone(),
1272 extra_versions: versions,
1273 };
1274 temporary_item_updates.push((item, StateDiff::Addition));
1275
1276 info!(
1277 "create temporary {} {} ({})",
1278 item_type,
1279 state.resolve_full_name(&name, None),
1280 id
1281 );
1282 } else {
1283 if let Some(temp_id) =
1284 item.uses()
1285 .iter()
1286 .find(|id| match state.try_get_entry(*id) {
1287 Some(entry) => entry.item().is_temporary(),
1288 None => temporary_ids.contains(id),
1289 })
1290 {
1291 let temp_item = state.get_entry(temp_id);
1292 return Err(AdapterError::Catalog(Error::new(
1293 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1294 )));
1295 }
1296 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1297 && !system_user
1298 {
1299 let schema_name = state
1300 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1301 .schema;
1302 return Err(AdapterError::Catalog(Error::new(
1303 ErrorKind::ReadOnlySystemSchema(schema_name),
1304 )));
1305 }
1306 let schema_id = name.qualifiers.schema_spec.clone().into();
1307 let item_type = item.typ();
1308 let (create_sql, global_id, versions) = item.to_serialized();
1309 tx.insert_user_item(
1310 id,
1311 global_id,
1312 schema_id,
1313 &name.item,
1314 create_sql,
1315 owner_id,
1316 privileges.clone(),
1317 &temporary_oids,
1318 versions,
1319 )?;
1320 info!(
1321 "create {} {} ({})",
1322 item_type,
1323 state.resolve_full_name(&name, None),
1324 id
1325 );
1326 }
1327
1328 if Self::should_audit_log_item(&item) {
1329 let name = Self::full_name_detail(
1330 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1331 );
1332 let details = match &item {
1333 CatalogItem::Source(s) => {
1334 let cluster_id = match s.data_source {
1335 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1338 match state.get_entry(&ingestion_id).cluster_id() {
1339 Some(cluster_id) => Some(cluster_id.to_string()),
1340 None => None,
1341 }
1342 }
1343 _ => match item.cluster_id() {
1344 Some(cluster_id) => Some(cluster_id.to_string()),
1345 None => None,
1346 },
1347 };
1348
1349 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1350 id: id.to_string(),
1351 cluster_id,
1352 name,
1353 external_type: s.source_type().to_string(),
1354 })
1355 }
1356 CatalogItem::Sink(s) => {
1357 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1358 id: id.to_string(),
1359 cluster_id: Some(s.cluster_id.to_string()),
1360 name,
1361 external_type: s.sink_type().to_string(),
1362 })
1363 }
1364 CatalogItem::Index(i) => {
1365 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1366 id: id.to_string(),
1367 name,
1368 cluster_id: i.cluster_id.to_string(),
1369 })
1370 }
1371 CatalogItem::MaterializedView(mv) => {
1372 EventDetails::CreateMaterializedViewV1(
1373 mz_audit_log::CreateMaterializedViewV1 {
1374 id: id.to_string(),
1375 name,
1376 cluster_id: mv.cluster_id.to_string(),
1377 replacement_target_id: mv
1378 .replacement_target
1379 .map(|id| id.to_string()),
1380 },
1381 )
1382 }
1383 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1384 id: id.to_string(),
1385 name,
1386 }),
1387 };
1388 CatalogState::add_to_audit_log(
1389 &state.system_configuration,
1390 oracle_write_ts,
1391 session,
1392 tx,
1393 audit_events,
1394 EventType::Create,
1395 catalog_type_to_audit_object_type(item.typ()),
1396 details,
1397 )?;
1398 }
1399 }
1400 Op::CreateNetworkPolicy {
1401 rules,
1402 name,
1403 owner_id,
1404 } => {
1405 if state.network_policies_by_name.contains_key(&name) {
1406 return Err(AdapterError::PlanError(PlanError::Catalog(
1407 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1408 )));
1409 }
1410 if is_reserved_name(&name) {
1411 return Err(AdapterError::Catalog(Error::new(
1412 ErrorKind::ReservedNetworkPolicyName(name),
1413 )));
1414 }
1415
1416 let owner_privileges = vec![rbac::owner_privilege(
1417 mz_sql::catalog::ObjectType::NetworkPolicy,
1418 owner_id,
1419 )];
1420 let default_privileges = state
1421 .default_privileges
1422 .get_applicable_privileges(
1423 owner_id,
1424 None,
1425 None,
1426 mz_sql::catalog::ObjectType::NetworkPolicy,
1427 )
1428 .map(|item| item.mz_acl_item(owner_id));
1429 let privileges: Vec<_> =
1430 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1431 .collect();
1432
1433 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1434 let id = tx.insert_user_network_policy(
1435 name.clone(),
1436 rules,
1437 privileges,
1438 owner_id,
1439 &temporary_oids,
1440 )?;
1441
1442 CatalogState::add_to_audit_log(
1443 &state.system_configuration,
1444 oracle_write_ts,
1445 session,
1446 tx,
1447 audit_events,
1448 EventType::Create,
1449 ObjectType::NetworkPolicy,
1450 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1451 id: id.to_string(),
1452 name: name.clone(),
1453 }),
1454 )?;
1455
1456 info!("created network policy {name} ({id})");
1457 }
1458 Op::Comment {
1459 object_id,
1460 sub_component,
1461 comment,
1462 } => {
1463 tx.update_comment(object_id, sub_component, comment)?;
1464 let entry = state.get_comment_id_entry(&object_id);
1465 let should_log = entry
1466 .map(|entry| Self::should_audit_log_item(entry.item()))
1467 .unwrap_or(true);
1469 if let (Some(conn_id), true) =
1472 (session.map(|session| session.conn_id()), should_log)
1473 {
1474 CatalogState::add_to_audit_log(
1475 &state.system_configuration,
1476 oracle_write_ts,
1477 session,
1478 tx,
1479 audit_events,
1480 EventType::Comment,
1481 comment_id_to_audit_object_type(object_id),
1482 EventDetails::IdNameV1(IdNameV1 {
1483 id: format!("{object_id:?}"),
1485 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1486 }),
1487 )?;
1488 }
1489 }
1490 Op::UpdateSourceReferences {
1491 source_id,
1492 references,
1493 } => {
1494 tx.update_source_references(
1495 source_id,
1496 references
1497 .references
1498 .into_iter()
1499 .map(|reference| reference.into())
1500 .collect(),
1501 references.updated_at,
1502 )?;
1503 }
1504 Op::DropObjects(drop_object_infos) => {
1505 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1507
1508 tx.drop_comments(&delta.comments)?;
1510
1511 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1513 delta
1514 .items
1515 .iter()
1516 .map(|id| id)
1517 .partition(|id| !state.get_entry(*id).item().is_temporary());
1518 tx.remove_items(&durable_items_to_drop)?;
1519 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1520 let entry = state.get_entry(&id);
1521 (entry.clone().into(), StateDiff::Retraction)
1522 }));
1523
1524 for item_id in delta.items {
1525 let entry = state.get_entry(&item_id);
1526
1527 if entry.item().is_storage_collection() {
1528 storage_collections_to_drop.extend(entry.global_ids());
1529 }
1530
1531 if state.source_references.contains_key(&item_id) {
1532 tx.remove_source_references(item_id)?;
1533 }
1534
1535 if Self::should_audit_log_item(entry.item()) {
1536 CatalogState::add_to_audit_log(
1537 &state.system_configuration,
1538 oracle_write_ts,
1539 session,
1540 tx,
1541 audit_events,
1542 EventType::Drop,
1543 catalog_type_to_audit_object_type(entry.item().typ()),
1544 EventDetails::IdFullNameV1(IdFullNameV1 {
1545 id: item_id.to_string(),
1546 name: Self::full_name_detail(&state.resolve_full_name(
1547 entry.name(),
1548 session.map(|session| session.conn_id()),
1549 )),
1550 }),
1551 )?;
1552 }
1553 info!(
1554 "drop {} {} ({})",
1555 entry.item_type(),
1556 state.resolve_full_name(entry.name(), entry.conn_id()),
1557 item_id
1558 );
1559 }
1560
1561 let schemas = delta
1563 .schemas
1564 .iter()
1565 .map(|(schema_spec, database_spec)| {
1566 (SchemaId::from(schema_spec), *database_spec)
1567 })
1568 .collect();
1569 tx.remove_schemas(&schemas)?;
1570
1571 for (schema_spec, database_spec) in delta.schemas {
1572 let schema = state.get_schema(
1573 &database_spec,
1574 &schema_spec,
1575 session
1576 .map(|session| session.conn_id())
1577 .unwrap_or(&SYSTEM_CONN_ID),
1578 );
1579
1580 let schema_id = SchemaId::from(schema_spec);
1581 let database_id = match database_spec {
1582 ResolvedDatabaseSpecifier::Ambient => None,
1583 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1584 };
1585
1586 CatalogState::add_to_audit_log(
1587 &state.system_configuration,
1588 oracle_write_ts,
1589 session,
1590 tx,
1591 audit_events,
1592 EventType::Drop,
1593 ObjectType::Schema,
1594 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1595 id: schema_id.to_string(),
1596 name: schema.name.schema.to_string(),
1597 database_name: database_id
1598 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1599 }),
1600 )?;
1601 }
1602
1603 tx.remove_databases(&delta.databases)?;
1605
1606 for database_id in delta.databases {
1607 let database = state.get_database(&database_id).clone();
1608
1609 CatalogState::add_to_audit_log(
1610 &state.system_configuration,
1611 oracle_write_ts,
1612 session,
1613 tx,
1614 audit_events,
1615 EventType::Drop,
1616 ObjectType::Database,
1617 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1618 id: database_id.to_string(),
1619 name: database.name.clone(),
1620 }),
1621 )?;
1622 }
1623
1624 tx.remove_user_roles(&delta.roles)?;
1626
1627 for role_id in delta.roles {
1628 let role = state
1629 .roles_by_id
1630 .get(&role_id)
1631 .expect("catalog out of sync");
1632
1633 CatalogState::add_to_audit_log(
1634 &state.system_configuration,
1635 oracle_write_ts,
1636 session,
1637 tx,
1638 audit_events,
1639 EventType::Drop,
1640 ObjectType::Role,
1641 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1642 id: role.id.to_string(),
1643 name: role.name.clone(),
1644 }),
1645 )?;
1646 info!("drop role {}", role.name());
1647 }
1648
1649 tx.remove_network_policies(&delta.network_policies)?;
1651
1652 for network_policy_id in delta.network_policies {
1653 let policy = state
1654 .network_policies_by_id
1655 .get(&network_policy_id)
1656 .expect("catalog out of sync");
1657
1658 CatalogState::add_to_audit_log(
1659 &state.system_configuration,
1660 oracle_write_ts,
1661 session,
1662 tx,
1663 audit_events,
1664 EventType::Drop,
1665 ObjectType::NetworkPolicy,
1666 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1667 id: policy.id.to_string(),
1668 name: policy.name.clone(),
1669 }),
1670 )?;
1671 info!("drop network policy {}", policy.name.clone());
1672 }
1673
1674 let replicas = delta.replicas.keys().copied().collect();
1676 tx.remove_cluster_replicas(&replicas)?;
1677
1678 for (replica_id, (cluster_id, reason)) in delta.replicas {
1679 let cluster = state.get_cluster(cluster_id);
1680 let replica = cluster.replica(replica_id).expect("Must exist");
1681
1682 let (reason, scheduling_policies) = reason.into_audit_log();
1683 let details =
1684 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1685 cluster_id: cluster_id.to_string(),
1686 cluster_name: cluster.name.clone(),
1687 replica_id: Some(replica_id.to_string()),
1688 replica_name: replica.name.clone(),
1689 reason,
1690 scheduling_policies,
1691 });
1692 CatalogState::add_to_audit_log(
1693 &state.system_configuration,
1694 oracle_write_ts,
1695 session,
1696 tx,
1697 audit_events,
1698 EventType::Drop,
1699 ObjectType::ClusterReplica,
1700 details,
1701 )?;
1702 }
1703
1704 tx.remove_clusters(&delta.clusters)?;
1706
1707 for cluster_id in delta.clusters {
1708 let cluster = state.get_cluster(cluster_id);
1709
1710 CatalogState::add_to_audit_log(
1711 &state.system_configuration,
1712 oracle_write_ts,
1713 session,
1714 tx,
1715 audit_events,
1716 EventType::Drop,
1717 ObjectType::Cluster,
1718 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1719 id: cluster.id.to_string(),
1720 name: cluster.name.clone(),
1721 }),
1722 )?;
1723 }
1724 }
1725 Op::GrantRole {
1726 role_id,
1727 member_id,
1728 grantor_id,
1729 } => {
1730 state.ensure_not_reserved_role(&member_id)?;
1731 state.ensure_grantable_role(&role_id)?;
1732 if state.collect_role_membership(&role_id).contains(&member_id) {
1733 let group_role = state.get_role(&role_id);
1734 let member_role = state.get_role(&member_id);
1735 return Err(AdapterError::Catalog(Error::new(
1736 ErrorKind::CircularRoleMembership {
1737 role_name: group_role.name().to_string(),
1738 member_name: member_role.name().to_string(),
1739 },
1740 )));
1741 }
1742 let mut member_role = state.get_role(&member_id).clone();
1743 member_role.membership.map.insert(role_id, grantor_id);
1744 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1745
1746 CatalogState::add_to_audit_log(
1747 &state.system_configuration,
1748 oracle_write_ts,
1749 session,
1750 tx,
1751 audit_events,
1752 EventType::Grant,
1753 ObjectType::Role,
1754 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1755 role_id: role_id.to_string(),
1756 member_id: member_id.to_string(),
1757 grantor_id: grantor_id.to_string(),
1758 executed_by: session
1759 .map(|session| session.authenticated_role_id())
1760 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1761 .to_string(),
1762 }),
1763 )?;
1764 }
1765 Op::RevokeRole {
1766 role_id,
1767 member_id,
1768 grantor_id,
1769 } => {
1770 state.ensure_not_reserved_role(&member_id)?;
1771 state.ensure_grantable_role(&role_id)?;
1772 let mut member_role = state.get_role(&member_id).clone();
1773 member_role.membership.map.remove(&role_id);
1774 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1775
1776 CatalogState::add_to_audit_log(
1777 &state.system_configuration,
1778 oracle_write_ts,
1779 session,
1780 tx,
1781 audit_events,
1782 EventType::Revoke,
1783 ObjectType::Role,
1784 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1785 role_id: role_id.to_string(),
1786 member_id: member_id.to_string(),
1787 grantor_id: grantor_id.to_string(),
1788 executed_by: session
1789 .map(|session| session.authenticated_role_id())
1790 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1791 .to_string(),
1792 }),
1793 )?;
1794 }
1795 Op::UpdatePrivilege {
1796 target_id,
1797 privilege,
1798 variant,
1799 } => {
1800 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1801 UpdatePrivilegeVariant::Grant => {
1802 privileges.grant(privilege);
1803 }
1804 UpdatePrivilegeVariant::Revoke => {
1805 privileges.revoke(&privilege);
1806 }
1807 };
1808 match &target_id {
1809 SystemObjectId::Object(object_id) => match object_id {
1810 ObjectId::Cluster(id) => {
1811 let mut cluster = state.get_cluster(*id).clone();
1812 update_privilege_fn(&mut cluster.privileges);
1813 tx.update_cluster(*id, cluster.into())?;
1814 }
1815 ObjectId::Database(id) => {
1816 let mut database = state.get_database(id).clone();
1817 update_privilege_fn(&mut database.privileges);
1818 tx.update_database(*id, database.into())?;
1819 }
1820 ObjectId::NetworkPolicy(id) => {
1821 let mut policy = state.get_network_policy(id).clone();
1822 update_privilege_fn(&mut policy.privileges);
1823 tx.update_network_policy(*id, policy.into())?;
1824 }
1825 ObjectId::Schema((database_spec, schema_spec)) => {
1826 let schema_id = schema_spec.clone().into();
1827 let mut schema = state
1828 .get_schema(
1829 database_spec,
1830 schema_spec,
1831 session
1832 .map(|session| session.conn_id())
1833 .unwrap_or(&SYSTEM_CONN_ID),
1834 )
1835 .clone();
1836 update_privilege_fn(&mut schema.privileges);
1837 tx.update_schema(schema_id, schema.into())?;
1838 }
1839 ObjectId::Item(id) => {
1840 let entry = state.get_entry(id);
1841 let mut new_entry = entry.clone();
1842 update_privilege_fn(&mut new_entry.privileges);
1843 if !new_entry.item().is_temporary() {
1844 tx.update_item(*id, new_entry.into())?;
1845 } else {
1846 temporary_item_updates
1847 .push((entry.clone().into(), StateDiff::Retraction));
1848 temporary_item_updates
1849 .push((new_entry.into(), StateDiff::Addition));
1850 }
1851 }
1852 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1853 },
1854 SystemObjectId::System => {
1855 let mut system_privileges = state.system_privileges.clone();
1856 update_privilege_fn(&mut system_privileges);
1857 let new_privilege =
1858 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1859 tx.set_system_privilege(
1860 privilege.grantee,
1861 privilege.grantor,
1862 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1863 )?;
1864 }
1865 }
1866 let object_type = state.get_system_object_type(&target_id);
1867 let object_id_str = match &target_id {
1868 SystemObjectId::System => "SYSTEM".to_string(),
1869 SystemObjectId::Object(id) => id.to_string(),
1870 };
1871 CatalogState::add_to_audit_log(
1872 &state.system_configuration,
1873 oracle_write_ts,
1874 session,
1875 tx,
1876 audit_events,
1877 variant.into(),
1878 system_object_type_to_audit_object_type(&object_type),
1879 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1880 object_id: object_id_str,
1881 grantee_id: privilege.grantee.to_string(),
1882 grantor_id: privilege.grantor.to_string(),
1883 privileges: privilege.acl_mode.to_string(),
1884 }),
1885 )?;
1886 }
1887 Op::UpdateDefaultPrivilege {
1888 privilege_object,
1889 privilege_acl_item,
1890 variant,
1891 } => {
1892 let mut default_privileges = state.default_privileges.clone();
1893 match variant {
1894 UpdatePrivilegeVariant::Grant => default_privileges
1895 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1896 UpdatePrivilegeVariant::Revoke => {
1897 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1898 }
1899 }
1900 let new_acl_mode = default_privileges
1901 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1902 tx.set_default_privilege(
1903 privilege_object.role_id,
1904 privilege_object.database_id,
1905 privilege_object.schema_id,
1906 privilege_object.object_type,
1907 privilege_acl_item.grantee,
1908 new_acl_mode.cloned(),
1909 )?;
1910 CatalogState::add_to_audit_log(
1911 &state.system_configuration,
1912 oracle_write_ts,
1913 session,
1914 tx,
1915 audit_events,
1916 variant.into(),
1917 object_type_to_audit_object_type(privilege_object.object_type),
1918 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1919 role_id: privilege_object.role_id.to_string(),
1920 database_id: privilege_object.database_id.map(|id| id.to_string()),
1921 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1922 grantee_id: privilege_acl_item.grantee.to_string(),
1923 privileges: privilege_acl_item.acl_mode.to_string(),
1924 }),
1925 )?;
1926 }
1927 Op::RenameCluster {
1928 id,
1929 name,
1930 to_name,
1931 check_reserved_names,
1932 } => {
1933 if id.is_system() {
1934 return Err(AdapterError::Catalog(Error::new(
1935 ErrorKind::ReadOnlyCluster(name.clone()),
1936 )));
1937 }
1938 if check_reserved_names && is_reserved_name(&to_name) {
1939 return Err(AdapterError::Catalog(Error::new(
1940 ErrorKind::ReservedClusterName(to_name),
1941 )));
1942 }
1943 tx.rename_cluster(id, &name, &to_name)?;
1944 CatalogState::add_to_audit_log(
1945 &state.system_configuration,
1946 oracle_write_ts,
1947 session,
1948 tx,
1949 audit_events,
1950 EventType::Alter,
1951 ObjectType::Cluster,
1952 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1953 id: id.to_string(),
1954 old_name: name.clone(),
1955 new_name: to_name.clone(),
1956 }),
1957 )?;
1958 info!("rename cluster {name} to {to_name}");
1959 }
1960 Op::RenameClusterReplica {
1961 cluster_id,
1962 replica_id,
1963 name,
1964 to_name,
1965 } => {
1966 if is_reserved_name(&to_name) {
1967 return Err(AdapterError::Catalog(Error::new(
1968 ErrorKind::ReservedReplicaName(to_name),
1969 )));
1970 }
1971 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1972 CatalogState::add_to_audit_log(
1973 &state.system_configuration,
1974 oracle_write_ts,
1975 session,
1976 tx,
1977 audit_events,
1978 EventType::Alter,
1979 ObjectType::ClusterReplica,
1980 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1981 cluster_id: cluster_id.to_string(),
1982 replica_id: replica_id.to_string(),
1983 old_name: name.replica.as_str().to_string(),
1984 new_name: to_name.clone(),
1985 }),
1986 )?;
1987 info!("rename cluster replica {name} to {to_name}");
1988 }
1989 Op::RenameItem {
1990 id,
1991 to_name,
1992 current_full_name,
1993 } => {
1994 let mut updates = Vec::new();
1995
1996 let entry = state.get_entry(&id);
1997 if let CatalogItem::Type(_) = entry.item() {
1998 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1999 current_full_name.to_string(),
2000 ))));
2001 }
2002
2003 if entry.id().is_system() {
2004 let name = state
2005 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2006 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2007 name.to_string(),
2008 ))));
2009 }
2010
2011 let mut to_full_name = current_full_name.clone();
2012 to_full_name.item.clone_from(&to_name);
2013
2014 let mut to_qualified_name = entry.name().clone();
2015 to_qualified_name.item.clone_from(&to_name);
2016
2017 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2018 id: id.to_string(),
2019 old_name: Self::full_name_detail(¤t_full_name),
2020 new_name: Self::full_name_detail(&to_full_name),
2021 });
2022 if Self::should_audit_log_item(entry.item()) {
2023 CatalogState::add_to_audit_log(
2024 &state.system_configuration,
2025 oracle_write_ts,
2026 session,
2027 tx,
2028 audit_events,
2029 EventType::Alter,
2030 catalog_type_to_audit_object_type(entry.item().typ()),
2031 details,
2032 )?;
2033 }
2034
2035 let mut new_entry = entry.clone();
2037 new_entry.name.item.clone_from(&to_name);
2038 new_entry.item = entry
2039 .item()
2040 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2041 .map_err(|e| {
2042 Error::new(ErrorKind::from(AmbiguousRename {
2043 depender: state
2044 .resolve_full_name(entry.name(), entry.conn_id())
2045 .to_string(),
2046 dependee: state
2047 .resolve_full_name(entry.name(), entry.conn_id())
2048 .to_string(),
2049 message: e,
2050 }))
2051 })?;
2052
2053 for id in entry.referenced_by() {
2054 let dependent_item = state.get_entry(id);
2055 let mut to_entry = dependent_item.clone();
2056 to_entry.item = dependent_item
2057 .item()
2058 .rename_item_refs(
2059 current_full_name.clone(),
2060 to_full_name.item.clone(),
2061 false,
2062 )
2063 .map_err(|e| {
2064 Error::new(ErrorKind::from(AmbiguousRename {
2065 depender: state
2066 .resolve_full_name(
2067 dependent_item.name(),
2068 dependent_item.conn_id(),
2069 )
2070 .to_string(),
2071 dependee: state
2072 .resolve_full_name(entry.name(), entry.conn_id())
2073 .to_string(),
2074 message: e,
2075 }))
2076 })?;
2077
2078 if !to_entry.item().is_temporary() {
2079 tx.update_item(*id, to_entry.into())?;
2080 } else {
2081 temporary_item_updates
2082 .push((dependent_item.clone().into(), StateDiff::Retraction));
2083 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2084 }
2085 updates.push(*id);
2086 }
2087 if !new_entry.item().is_temporary() {
2088 tx.update_item(id, new_entry.into())?;
2089 } else {
2090 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2091 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2092 }
2093
2094 updates.push(id);
2095 for id in updates {
2096 Self::log_update(state, &id);
2097 }
2098 }
2099 Op::RenameSchema {
2100 database_spec,
2101 schema_spec,
2102 new_name,
2103 check_reserved_names,
2104 } => {
2105 if check_reserved_names && is_reserved_name(&new_name) {
2106 return Err(AdapterError::Catalog(Error::new(
2107 ErrorKind::ReservedSchemaName(new_name),
2108 )));
2109 }
2110
2111 let conn_id = session
2112 .map(|session| session.conn_id())
2113 .unwrap_or(&SYSTEM_CONN_ID);
2114
2115 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2116 let cur_name = schema.name().schema.clone();
2117
2118 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2119 return Err(AdapterError::Catalog(Error::new(
2120 ErrorKind::AmbientSchemaRename(cur_name),
2121 )));
2122 };
2123 let database = state.get_database(&database_id);
2124 let database_name = &database.name;
2125
2126 let mut updates = Vec::new();
2127 let mut items_to_update = BTreeMap::new();
2128
2129 let mut update_item = |id| {
2130 if items_to_update.contains_key(id) {
2131 return Ok(());
2132 }
2133
2134 let entry = state.get_entry(id);
2135
2136 let mut new_entry = entry.clone();
2138 new_entry.item = entry
2139 .item
2140 .rename_schema_refs(database_name, &cur_name, &new_name)
2141 .map_err(|(s, _i)| {
2142 Error::new(ErrorKind::from(AmbiguousRename {
2143 depender: state
2144 .resolve_full_name(entry.name(), entry.conn_id())
2145 .to_string(),
2146 dependee: format!("{database_name}.{cur_name}"),
2147 message: format!("ambiguous reference to schema named {s}"),
2148 }))
2149 })?;
2150
2151 if !new_entry.item().is_temporary() {
2153 items_to_update.insert(*id, new_entry.into());
2154 } else {
2155 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2156 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2157 }
2158 updates.push(id);
2159
2160 Ok::<_, AdapterError>(())
2161 };
2162
2163 for (_name, item_id) in &schema.items {
2165 update_item(item_id)?;
2167
2168 for id in state.get_entry(item_id).referenced_by() {
2170 update_item(id)?;
2171 }
2172 }
2173 tx.update_items(items_to_update)?;
2176
2177 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2179 let schema_name = schema.name().schema.clone();
2180 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2181 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2182 )));
2183 };
2184
2185 let database_name = database_spec
2187 .id()
2188 .map(|id| state.get_database(&id).name.clone());
2189 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2190 id: schema_id.to_string(),
2191 old_name: schema.name().schema.clone(),
2192 new_name: new_name.clone(),
2193 database_name,
2194 });
2195 CatalogState::add_to_audit_log(
2196 &state.system_configuration,
2197 oracle_write_ts,
2198 session,
2199 tx,
2200 audit_events,
2201 EventType::Alter,
2202 mz_audit_log::ObjectType::Schema,
2203 details,
2204 )?;
2205
2206 let mut new_schema = schema.clone();
2208 new_schema.name.schema.clone_from(&new_name);
2209 tx.update_schema(schema_id, new_schema.into())?;
2210
2211 for id in updates {
2212 Self::log_update(state, id);
2213 }
2214 }
2215 Op::UpdateOwner { id, new_owner } => {
2216 let conn_id = session
2217 .map(|session| session.conn_id())
2218 .unwrap_or(&SYSTEM_CONN_ID);
2219 let old_owner = state
2220 .get_owner_id(&id, conn_id)
2221 .expect("cannot update the owner of an object without an owner");
2222 match &id {
2223 ObjectId::Cluster(id) => {
2224 let mut cluster = state.get_cluster(*id).clone();
2225 if id.is_system() {
2226 return Err(AdapterError::Catalog(Error::new(
2227 ErrorKind::ReadOnlyCluster(cluster.name),
2228 )));
2229 }
2230 Self::update_privilege_owners(
2231 &mut cluster.privileges,
2232 cluster.owner_id,
2233 new_owner,
2234 );
2235 cluster.owner_id = new_owner;
2236 tx.update_cluster(*id, cluster.into())?;
2237 }
2238 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2239 let cluster = state.get_cluster(*cluster_id);
2240 let mut replica = cluster
2241 .replica(*replica_id)
2242 .expect("catalog out of sync")
2243 .clone();
2244 if replica_id.is_system() {
2245 return Err(AdapterError::Catalog(Error::new(
2246 ErrorKind::ReadOnlyClusterReplica(replica.name),
2247 )));
2248 }
2249 replica.owner_id = new_owner;
2250 tx.update_cluster_replica(*replica_id, replica.into())?;
2251 }
2252 ObjectId::Database(id) => {
2253 let mut database = state.get_database(id).clone();
2254 if id.is_system() {
2255 return Err(AdapterError::Catalog(Error::new(
2256 ErrorKind::ReadOnlyDatabase(database.name),
2257 )));
2258 }
2259 Self::update_privilege_owners(
2260 &mut database.privileges,
2261 database.owner_id,
2262 new_owner,
2263 );
2264 database.owner_id = new_owner;
2265 tx.update_database(*id, database.clone().into())?;
2266 }
2267 ObjectId::Schema((database_spec, schema_spec)) => {
2268 let schema_id: SchemaId = schema_spec.clone().into();
2269 let mut schema = state
2270 .get_schema(database_spec, schema_spec, conn_id)
2271 .clone();
2272 if schema_id.is_system() {
2273 let name = schema.name();
2274 let full_name = state.resolve_full_schema_name(name);
2275 return Err(AdapterError::Catalog(Error::new(
2276 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2277 )));
2278 }
2279 Self::update_privilege_owners(
2280 &mut schema.privileges,
2281 schema.owner_id,
2282 new_owner,
2283 );
2284 schema.owner_id = new_owner;
2285 tx.update_schema(schema_id, schema.into())?;
2286 }
2287 ObjectId::Item(id) => {
2288 let entry = state.get_entry(id);
2289 let mut new_entry = entry.clone();
2290 if id.is_system() {
2291 let full_name = state.resolve_full_name(
2292 new_entry.name(),
2293 session.map(|session| session.conn_id()),
2294 );
2295 return Err(AdapterError::Catalog(Error::new(
2296 ErrorKind::ReadOnlyItem(full_name.to_string()),
2297 )));
2298 }
2299 Self::update_privilege_owners(
2300 &mut new_entry.privileges,
2301 new_entry.owner_id,
2302 new_owner,
2303 );
2304 new_entry.owner_id = new_owner;
2305 if !new_entry.item().is_temporary() {
2306 tx.update_item(*id, new_entry.into())?;
2307 } else {
2308 temporary_item_updates
2309 .push((entry.clone().into(), StateDiff::Retraction));
2310 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2311 }
2312 }
2313 ObjectId::NetworkPolicy(id) => {
2314 let mut policy = state.get_network_policy(id).clone();
2315 if id.is_system() {
2316 return Err(AdapterError::Catalog(Error::new(
2317 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2318 )));
2319 }
2320 Self::update_privilege_owners(
2321 &mut policy.privileges,
2322 policy.owner_id,
2323 new_owner,
2324 );
2325 policy.owner_id = new_owner;
2326 tx.update_network_policy(*id, policy.into())?;
2327 }
2328 ObjectId::Role(_) => unreachable!("roles have no owner"),
2329 }
2330 let object_type = state.get_object_type(&id);
2331 CatalogState::add_to_audit_log(
2332 &state.system_configuration,
2333 oracle_write_ts,
2334 session,
2335 tx,
2336 audit_events,
2337 EventType::Alter,
2338 object_type_to_audit_object_type(object_type),
2339 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2340 object_id: id.to_string(),
2341 old_owner_id: old_owner.to_string(),
2342 new_owner_id: new_owner.to_string(),
2343 }),
2344 )?;
2345 }
2346 Op::UpdateClusterConfig { id, name, config } => {
2347 let mut cluster = state.get_cluster(id).clone();
2348 cluster.config = config;
2349 tx.update_cluster(id, cluster.into())?;
2350 info!("update cluster {}", name);
2351
2352 CatalogState::add_to_audit_log(
2353 &state.system_configuration,
2354 oracle_write_ts,
2355 session,
2356 tx,
2357 audit_events,
2358 EventType::Alter,
2359 ObjectType::Cluster,
2360 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2361 id: id.to_string(),
2362 name,
2363 }),
2364 )?;
2365 }
2366 Op::UpdateClusterReplicaConfig {
2367 replica_id,
2368 cluster_id,
2369 config,
2370 } => {
2371 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2372 info!("update replica {}", replica.name);
2373 tx.update_cluster_replica(
2374 replica_id,
2375 mz_catalog::durable::ClusterReplica {
2376 cluster_id,
2377 replica_id,
2378 name: replica.name.clone(),
2379 config: config.clone().into(),
2380 owner_id: replica.owner_id,
2381 },
2382 )?;
2383 }
2384 Op::UpdateItem { id, name, to_item } => {
2385 let mut entry = state.get_entry(&id).clone();
2386 entry.name = name.clone();
2387 entry.item = to_item.clone();
2388 tx.update_item(id, entry.into())?;
2389
2390 if Self::should_audit_log_item(&to_item) {
2391 let mut full_name = Self::full_name_detail(
2392 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2393 );
2394 full_name.item = name.item;
2395
2396 CatalogState::add_to_audit_log(
2397 &state.system_configuration,
2398 oracle_write_ts,
2399 session,
2400 tx,
2401 audit_events,
2402 EventType::Alter,
2403 catalog_type_to_audit_object_type(to_item.typ()),
2404 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2405 id: id.to_string(),
2406 name: full_name,
2407 }),
2408 )?;
2409 }
2410
2411 Self::log_update(state, &id);
2412 }
2413 Op::UpdateSystemConfiguration { name, value } => {
2414 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2415 tx.upsert_system_config(&name, parsed_value.clone())?;
2416 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2421 let with_0dt_deployment_max_wait =
2422 Duration::parse(VarInput::Flat(&parsed_value))
2423 .expect("parsing succeeded above");
2424 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2425 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2426 let with_0dt_deployment_ddl_check_interval =
2427 Duration::parse(VarInput::Flat(&parsed_value))
2428 .expect("parsing succeeded above");
2429 tx.set_0dt_deployment_ddl_check_interval(
2430 with_0dt_deployment_ddl_check_interval,
2431 )?;
2432 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2433 let panic_after_timeout =
2434 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2435 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2436 }
2437
2438 CatalogState::add_to_audit_log(
2439 &state.system_configuration,
2440 oracle_write_ts,
2441 session,
2442 tx,
2443 audit_events,
2444 EventType::Alter,
2445 ObjectType::System,
2446 EventDetails::SetV1(mz_audit_log::SetV1 {
2447 name,
2448 value: Some(value.borrow().to_vec().join(", ")),
2449 }),
2450 )?;
2451 }
2452 Op::ResetSystemConfiguration { name } => {
2453 tx.remove_system_config(&name);
2454 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2459 tx.reset_0dt_deployment_max_wait()?;
2460 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2461 tx.reset_0dt_deployment_ddl_check_interval()?;
2462 }
2463
2464 CatalogState::add_to_audit_log(
2465 &state.system_configuration,
2466 oracle_write_ts,
2467 session,
2468 tx,
2469 audit_events,
2470 EventType::Alter,
2471 ObjectType::System,
2472 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2473 )?;
2474 }
2475 Op::ResetAllSystemConfiguration => {
2476 tx.clear_system_configs();
2477 tx.reset_0dt_deployment_max_wait()?;
2478 tx.reset_0dt_deployment_ddl_check_interval()?;
2479
2480 CatalogState::add_to_audit_log(
2481 &state.system_configuration,
2482 oracle_write_ts,
2483 session,
2484 tx,
2485 audit_events,
2486 EventType::Alter,
2487 ObjectType::System,
2488 EventDetails::ResetAllV1,
2489 )?;
2490 }
2491 Op::WeirdStorageUsageUpdates {
2492 object_id,
2493 size_bytes,
2494 collection_timestamp,
2495 } => {
2496 let id = tx.allocate_storage_usage_ids()?;
2497 let metric =
2498 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2499 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2500 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2501 weird_builtin_table_update = Some(builtin_table_update);
2502 }
2503 };
2504 Ok((weird_builtin_table_update, temporary_item_updates))
2505 }
2506
2507 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2508 let entry = state.get_entry(id);
2509 info!(
2510 "update {} {} ({})",
2511 entry.item_type(),
2512 state.resolve_full_name(entry.name(), entry.conn_id()),
2513 id
2514 );
2515 }
2516
2517 fn update_privilege_owners(
2521 privileges: &mut PrivilegeMap,
2522 old_owner: RoleId,
2523 new_owner: RoleId,
2524 ) {
2525 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2527
2528 let mut new_present = false;
2529 for privilege in flat_privileges.iter_mut() {
2530 if privilege.grantor == old_owner {
2533 privilege.grantor = new_owner;
2534 } else if privilege.grantor == new_owner {
2535 new_present = true;
2536 }
2537 if privilege.grantee == old_owner {
2539 privilege.grantee = new_owner;
2540 } else if privilege.grantee == new_owner {
2541 new_present = true;
2542 }
2543 }
2544
2545 if new_present {
2549 let privilege_map: BTreeMap<_, Vec<_>> =
2551 flat_privileges
2552 .into_iter()
2553 .fold(BTreeMap::new(), |mut accum, privilege| {
2554 accum
2555 .entry((privilege.grantee, privilege.grantor))
2556 .or_default()
2557 .push(privilege);
2558 accum
2559 });
2560
2561 flat_privileges = privilege_map
2563 .into_iter()
2564 .map(|((grantee, grantor), values)|
2565 values.into_iter().fold(
2567 MzAclItem::empty(grantee, grantor),
2568 |mut accum, mz_aclitem| {
2569 accum.acl_mode =
2570 accum.acl_mode.union(mz_aclitem.acl_mode);
2571 accum
2572 },
2573 ))
2574 .collect();
2575 }
2576
2577 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2578 }
2579}
2580
2581fn apply_replacement_audit_events(
2583 state: &CatalogState,
2584 target: &CatalogEntry,
2585 replacement: &CatalogEntry,
2586) -> Vec<(EventType, EventDetails)> {
2587 let mut events = Vec::new();
2588
2589 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2590 let target_id_name = IdFullNameV1 {
2591 id: target.id().to_string(),
2592 name: Catalog::full_name_detail(&target_name),
2593 };
2594 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2595 let replacement_id_name = IdFullNameV1 {
2596 id: replacement.id().to_string(),
2597 name: Catalog::full_name_detail(&replacement_name),
2598 };
2599
2600 if Catalog::should_audit_log_item(&target.item) {
2601 events.push((
2602 EventType::Alter,
2603 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2604 target: target_id_name.clone(),
2605 replacement: replacement_id_name.clone(),
2606 }),
2607 ));
2608
2609 if let Some(old_cluster_id) = target.cluster_id()
2610 && let Some(new_cluster_id) = replacement.cluster_id()
2611 && old_cluster_id != new_cluster_id
2612 {
2613 events.push((
2614 EventType::Alter,
2615 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2616 id: target.id().to_string(),
2617 name: target_id_name.name,
2618 old_cluster_id: old_cluster_id.to_string(),
2619 new_cluster_id: new_cluster_id.to_string(),
2620 }),
2621 ));
2622 }
2623 }
2624
2625 if Catalog::should_audit_log_item(&replacement.item) {
2626 events.push((
2627 EventType::Drop,
2628 EventDetails::IdFullNameV1(replacement_id_name),
2629 ));
2630 }
2631
2632 events
2633}
2634
2635#[derive(Debug, Default)]
2643pub(crate) struct ObjectsToDrop {
2644 pub comments: BTreeSet<CommentObjectId>,
2645 pub databases: BTreeSet<DatabaseId>,
2646 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2647 pub clusters: BTreeSet<ClusterId>,
2648 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2649 pub roles: BTreeSet<RoleId>,
2650 pub items: Vec<CatalogItemId>,
2651 pub network_policies: BTreeSet<NetworkPolicyId>,
2652}
2653
2654impl ObjectsToDrop {
2655 pub fn generate(
2656 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2657 state: &CatalogState,
2658 session: Option<&ConnMeta>,
2659 ) -> Result<Self, AdapterError> {
2660 let mut delta = ObjectsToDrop::default();
2661
2662 for drop_object_info in drop_object_infos {
2663 delta.add_item(drop_object_info, state, session)?;
2664 }
2665
2666 Ok(delta)
2667 }
2668
2669 fn add_item(
2670 &mut self,
2671 drop_object_info: DropObjectInfo,
2672 state: &CatalogState,
2673 session: Option<&ConnMeta>,
2674 ) -> Result<(), AdapterError> {
2675 self.comments
2676 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2677
2678 match drop_object_info {
2679 DropObjectInfo::Database(database_id) => {
2680 let database = &state.database_by_id[&database_id];
2681 if database_id.is_system() {
2682 return Err(AdapterError::Catalog(Error::new(
2683 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2684 )));
2685 }
2686
2687 self.databases.insert(database_id);
2688 }
2689 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2690 let schema = state.get_schema(
2691 &database_spec,
2692 &schema_spec,
2693 session
2694 .map(|session| session.conn_id())
2695 .unwrap_or(&SYSTEM_CONN_ID),
2696 );
2697 let schema_id: SchemaId = schema_spec.into();
2698 if schema_id.is_system() {
2699 let name = schema.name();
2700 let full_name = state.resolve_full_schema_name(name);
2701 return Err(AdapterError::Catalog(Error::new(
2702 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2703 )));
2704 }
2705
2706 self.schemas.insert(schema_spec, database_spec);
2707 }
2708 DropObjectInfo::Role(role_id) => {
2709 let name = state.get_role(&role_id).name().to_string();
2710 if role_id.is_system() || role_id.is_predefined() {
2711 return Err(AdapterError::Catalog(Error::new(
2712 ErrorKind::ReservedRoleName(name.clone()),
2713 )));
2714 }
2715 state.ensure_not_reserved_role(&role_id)?;
2716
2717 self.roles.insert(role_id);
2718 }
2719 DropObjectInfo::Cluster(cluster_id) => {
2720 let cluster = state.get_cluster(cluster_id);
2721 let name = &cluster.name;
2722 if cluster_id.is_system() {
2723 return Err(AdapterError::Catalog(Error::new(
2724 ErrorKind::ReadOnlyCluster(name.clone()),
2725 )));
2726 }
2727
2728 self.clusters.insert(cluster_id);
2729 }
2730 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2731 let cluster = state.get_cluster(cluster_id);
2732 let replica = cluster.replica(replica_id).expect("Must exist");
2733
2734 self.replicas
2735 .insert(replica.replica_id, (cluster.id, reason));
2736 }
2737 DropObjectInfo::Item(item_id) => {
2738 let entry = state.get_entry(&item_id);
2739 if item_id.is_system() {
2740 let name = entry.name();
2741 let full_name =
2742 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2743 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2744 full_name.to_string(),
2745 ))));
2746 }
2747
2748 self.items.push(item_id);
2749 }
2750 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2751 let policy = state.get_network_policy(&network_policy_id);
2752 let name = &policy.name;
2753 if network_policy_id.is_system() {
2754 return Err(AdapterError::Catalog(Error::new(
2755 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2756 )));
2757 }
2758
2759 self.network_policies.insert(network_policy_id);
2760 }
2761 }
2762
2763 Ok(())
2764 }
2765}
2766
2767#[cfg(test)]
2768mod tests {
2769 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2770 use mz_repr::role_id::RoleId;
2771
2772 use crate::catalog::Catalog;
2773
2774 #[mz_ore::test]
2775 fn test_update_privilege_owners() {
2776 let old_owner = RoleId::User(1);
2777 let new_owner = RoleId::User(2);
2778 let other_role = RoleId::User(3);
2779
2780 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2782 MzAclItem {
2783 grantee: other_role,
2784 grantor: old_owner,
2785 acl_mode: AclMode::UPDATE,
2786 },
2787 MzAclItem {
2788 grantee: other_role,
2789 grantor: new_owner,
2790 acl_mode: AclMode::SELECT,
2791 },
2792 ]);
2793 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2794 assert_eq!(1, privileges.all_values().count());
2795 assert_eq!(
2796 vec![MzAclItem {
2797 grantee: other_role,
2798 grantor: new_owner,
2799 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2800 }],
2801 privileges.all_values_owned().collect::<Vec<_>>()
2802 );
2803
2804 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2806 MzAclItem {
2807 grantee: old_owner,
2808 grantor: other_role,
2809 acl_mode: AclMode::UPDATE,
2810 },
2811 MzAclItem {
2812 grantee: new_owner,
2813 grantor: other_role,
2814 acl_mode: AclMode::SELECT,
2815 },
2816 ]);
2817 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2818 assert_eq!(1, privileges.all_values().count());
2819 assert_eq!(
2820 vec![MzAclItem {
2821 grantee: new_owner,
2822 grantor: other_role,
2823 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2824 }],
2825 privileges.all_values_owned().collect::<Vec<_>>()
2826 );
2827
2828 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2830 MzAclItem {
2831 grantee: old_owner,
2832 grantor: old_owner,
2833 acl_mode: AclMode::UPDATE,
2834 },
2835 MzAclItem {
2836 grantee: new_owner,
2837 grantor: new_owner,
2838 acl_mode: AclMode::SELECT,
2839 },
2840 ]);
2841 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2842 assert_eq!(1, privileges.all_values().count());
2843 assert_eq!(
2844 vec![MzAclItem {
2845 grantee: new_owner,
2846 grantor: new_owner,
2847 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2848 }],
2849 privileges.all_values_owned().collect::<Vec<_>>()
2850 );
2851 }
2852}