1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff,
35 StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52 RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72 is_reserved_role_name, object_type_to_audit_object_type,
73 system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82 AlterRetainHistory {
83 id: CatalogItemId,
84 value: Option<Value>,
85 window: CompactionWindow,
86 },
87 AlterRole {
88 id: RoleId,
89 name: String,
90 attributes: RoleAttributesRaw,
91 nopassword: bool,
92 vars: RoleVars,
93 },
94 AlterNetworkPolicy {
95 id: NetworkPolicyId,
96 rules: Vec<NetworkPolicyRule>,
97 name: String,
98 owner_id: RoleId,
99 },
100 AlterAddColumn {
101 id: CatalogItemId,
102 new_global_id: GlobalId,
103 name: ColumnName,
104 typ: SqlColumnType,
105 sql: RawDataType,
106 },
107 AlterMaterializedViewApplyReplacement {
108 id: CatalogItemId,
109 replacement_id: CatalogItemId,
110 },
111 CreateDatabase {
112 name: String,
113 owner_id: RoleId,
114 },
115 CreateSchema {
116 database_id: ResolvedDatabaseSpecifier,
117 schema_name: String,
118 owner_id: RoleId,
119 },
120 CreateRole {
121 name: String,
122 attributes: RoleAttributesRaw,
123 },
124 CreateCluster {
125 id: ClusterId,
126 name: String,
127 introspection_sources: Vec<&'static BuiltinLog>,
128 owner_id: RoleId,
129 config: ClusterConfig,
130 },
131 CreateClusterReplica {
132 cluster_id: ClusterId,
133 name: String,
134 config: ReplicaConfig,
135 owner_id: RoleId,
136 reason: ReplicaCreateDropReason,
137 },
138 CreateItem {
139 id: CatalogItemId,
140 name: QualifiedItemName,
141 item: CatalogItem,
142 owner_id: RoleId,
143 },
144 CreateNetworkPolicy {
145 rules: Vec<NetworkPolicyRule>,
146 name: String,
147 owner_id: RoleId,
148 },
149 Comment {
150 object_id: CommentObjectId,
151 sub_component: Option<usize>,
152 comment: Option<String>,
153 },
154 DropObjects(Vec<DropObjectInfo>),
155 GrantRole {
156 role_id: RoleId,
157 member_id: RoleId,
158 grantor_id: RoleId,
159 },
160 RenameCluster {
161 id: ClusterId,
162 name: String,
163 to_name: String,
164 check_reserved_names: bool,
165 },
166 RenameClusterReplica {
167 cluster_id: ClusterId,
168 replica_id: ReplicaId,
169 name: QualifiedReplica,
170 to_name: String,
171 },
172 RenameItem {
173 id: CatalogItemId,
174 current_full_name: FullItemName,
175 to_name: String,
176 },
177 RenameSchema {
178 database_spec: ResolvedDatabaseSpecifier,
179 schema_spec: SchemaSpecifier,
180 new_name: String,
181 check_reserved_names: bool,
182 },
183 UpdateOwner {
184 id: ObjectId,
185 new_owner: RoleId,
186 },
187 UpdatePrivilege {
188 target_id: SystemObjectId,
189 privilege: MzAclItem,
190 variant: UpdatePrivilegeVariant,
191 },
192 UpdateDefaultPrivilege {
193 privilege_object: DefaultPrivilegeObject,
194 privilege_acl_item: DefaultPrivilegeAclItem,
195 variant: UpdatePrivilegeVariant,
196 },
197 RevokeRole {
198 role_id: RoleId,
199 member_id: RoleId,
200 grantor_id: RoleId,
201 },
202 UpdateClusterConfig {
203 id: ClusterId,
204 name: String,
205 config: ClusterConfig,
206 },
207 UpdateClusterReplicaConfig {
208 cluster_id: ClusterId,
209 replica_id: ReplicaId,
210 config: ReplicaConfig,
211 },
212 UpdateItem {
213 id: CatalogItemId,
214 name: QualifiedItemName,
215 to_item: CatalogItem,
216 },
217 UpdateSourceReferences {
218 source_id: CatalogItemId,
219 references: SourceReferences,
220 },
221 UpdateSystemConfiguration {
222 name: String,
223 value: OwnedVarInput,
224 },
225 ResetSystemConfiguration {
226 name: String,
227 },
228 ResetAllSystemConfiguration,
229 WeirdStorageUsageUpdates {
236 object_id: Option<String>,
237 size_bytes: u64,
238 collection_timestamp: EpochMillis,
239 },
240 TransactionDryRun,
246}
247
248#[derive(Debug, Clone)]
252pub enum DropObjectInfo {
253 Cluster(ClusterId),
254 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
255 Database(DatabaseId),
256 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
257 Role(RoleId),
258 Item(CatalogItemId),
259 NetworkPolicy(NetworkPolicyId),
260}
261
262impl DropObjectInfo {
263 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
266 match id {
267 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
268 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
269 cluster_id,
270 replica_id,
271 ReplicaCreateDropReason::Manual,
272 )),
273 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
274 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
275 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
276 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
277 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
278 }
279 }
280
281 fn to_object_id(&self) -> ObjectId {
284 match &self {
285 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
286 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
287 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
288 }
289 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
290 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
291 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
292 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
293 DropObjectInfo::NetworkPolicy(network_policy_id) => {
294 ObjectId::NetworkPolicy(network_policy_id.clone())
295 }
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
302pub enum ReplicaCreateDropReason {
303 Manual,
308 ClusterScheduling(Vec<SchedulingDecision>),
311}
312
313impl ReplicaCreateDropReason {
314 pub fn into_audit_log(
315 self,
316 ) -> (
317 CreateOrDropClusterReplicaReasonV1,
318 Option<SchedulingDecisionsWithReasonsV2>,
319 ) {
320 let (reason, scheduling_policies) = match self {
321 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
322 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
323 CreateOrDropClusterReplicaReasonV1::Schedule,
324 Some(scheduling_decisions),
325 ),
326 };
327 (
328 reason,
329 scheduling_policies
330 .as_ref()
331 .map(SchedulingDecision::reasons_to_audit_log_reasons),
332 )
333 }
334}
335
336pub struct TransactionResult {
337 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
338 pub catalog_updates: Vec<ParsedStateUpdate>,
340 pub audit_events: Vec<VersionedEvent>,
341}
342
343impl Catalog {
344 fn should_audit_log_item(item: &CatalogItem) -> bool {
345 !item.is_temporary()
346 }
347
348 fn temporary_ids(
351 &self,
352 ops: &[Op],
353 temporary_drops: BTreeSet<(&ConnectionId, String)>,
354 ) -> Result<BTreeSet<CatalogItemId>, Error> {
355 let mut creating = BTreeSet::new();
356 let mut temporary_ids = BTreeSet::new();
357 for op in ops.iter() {
358 if let Op::CreateItem {
359 id,
360 name,
361 item,
362 owner_id: _,
363 } = op
364 {
365 if let Some(conn_id) = item.conn_id() {
366 if self.item_exists_in_temp_schemas(conn_id, &name.item)
367 && !temporary_drops.contains(&(conn_id, name.item.clone()))
368 || creating.contains(&(conn_id, &name.item))
369 {
370 return Err(
371 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
372 );
373 } else {
374 creating.insert((conn_id, &name.item));
375 temporary_ids.insert(id.clone());
376 }
377 }
378 }
379 }
380 Ok(temporary_ids)
381 }
382
383 #[instrument(name = "catalog::transact")]
384 pub async fn transact(
385 &mut self,
386 storage_collections: Option<
389 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
390 >,
391 oracle_write_ts: mz_repr::Timestamp,
392 session: Option<&ConnMeta>,
393 ops: Vec<Op>,
394 ) -> Result<TransactionResult, AdapterError> {
395 trace!("transact: {:?}", ops);
396 fail::fail_point!("catalog_transact", |arg| {
397 Err(AdapterError::Unstructured(anyhow::anyhow!(
398 "failpoint: {arg:?}"
399 )))
400 });
401
402 let drop_ids: BTreeSet<CatalogItemId> = ops
403 .iter()
404 .filter_map(|op| match op {
405 Op::DropObjects(drop_object_infos) => {
406 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
407 let item_ids = ids.filter_map(|id| match id {
408 ObjectId::Item(id) => Some(id),
409 _ => None,
410 });
411 Some(item_ids)
412 }
413 _ => None,
414 })
415 .flatten()
416 .collect();
417 let temporary_drops = drop_ids
418 .iter()
419 .filter_map(|id| {
420 let entry = self.get_entry(id);
421 match entry.item().conn_id() {
422 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
423 None => None,
424 }
425 })
426 .collect();
427 let dropped_global_ids = drop_ids
428 .iter()
429 .flat_map(|item_id| self.get_global_ids(item_id))
430 .collect();
431
432 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
433 let mut builtin_table_updates = vec![];
434 let mut catalog_updates = vec![];
435 let mut audit_events = vec![];
436 let mut storage = self.storage().await;
437 let mut tx = storage
438 .transaction()
439 .await
440 .unwrap_or_terminate("starting catalog transaction");
441
442 let new_state = Self::transact_inner(
443 storage_collections,
444 oracle_write_ts,
445 session,
446 ops,
447 temporary_ids,
448 &mut builtin_table_updates,
449 &mut catalog_updates,
450 &mut audit_events,
451 &mut tx,
452 &self.state,
453 )
454 .await?;
455
456 tx.commit(oracle_write_ts)
461 .await
462 .unwrap_or_terminate("catalog storage transaction commit must succeed");
463
464 drop(storage);
467 if let Some(new_state) = new_state {
468 self.transient_revision += 1;
469 self.state = new_state;
470 }
471
472 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
474 if self.state.system_config().enable_mz_notices() {
475 self.state().pack_optimizer_notices(
477 &mut builtin_table_updates,
478 dropped_notices.iter(),
479 Diff::MINUS_ONE,
480 );
481 }
482
483 Ok(TransactionResult {
484 builtin_table_updates,
485 catalog_updates,
486 audit_events,
487 })
488 }
489
490 fn extract_expressions_from_ops(
494 ops: &[Op],
495 optimizer_features: &OptimizerFeatures,
496 ) -> BTreeMap<GlobalId, LocalExpressions> {
497 let mut exprs = BTreeMap::new();
498
499 for op in ops {
500 if let Op::CreateItem { item, .. } = op {
501 match item {
502 CatalogItem::View(view) => {
503 exprs.insert(
504 view.global_id,
505 LocalExpressions {
506 local_mir: (*view.optimized_expr).clone(),
507 optimizer_features: optimizer_features.clone(),
508 },
509 );
510 }
511 CatalogItem::MaterializedView(mv) => {
512 exprs.insert(
513 mv.global_id_writes(),
514 LocalExpressions {
515 local_mir: (*mv.optimized_expr).clone(),
516 optimizer_features: optimizer_features.clone(),
517 },
518 );
519 }
520 _ => {}
521 }
522 }
523 }
524
525 exprs
526 }
527
528 #[instrument(name = "catalog::transact_inner")]
536 async fn transact_inner(
537 storage_collections: Option<
538 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
539 >,
540 oracle_write_ts: mz_repr::Timestamp,
541 session: Option<&ConnMeta>,
542 mut ops: Vec<Op>,
543 temporary_ids: BTreeSet<CatalogItemId>,
544 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
545 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
546 audit_events: &mut Vec<VersionedEvent>,
547 tx: &mut Transaction<'_>,
548 state: &CatalogState,
549 ) -> Result<Option<CatalogState>, AdapterError> {
550 let mut preliminary_state = Cow::Borrowed(state);
577
578 let mut state = Cow::Borrowed(state);
580
581 let dry_run_ops = match ops.last() {
582 Some(Op::TransactionDryRun) => {
583 ops.pop();
585 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
586 ops.clone()
587 }
588 Some(_) => vec![],
589 None => return Ok(None),
590 };
591
592 let optimizer_features = OptimizerFeatures::from(state.system_config());
595 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
596
597 let mut storage_collections_to_create = BTreeSet::new();
598 let mut storage_collections_to_drop = BTreeSet::new();
599 let mut storage_collections_to_register = BTreeMap::new();
600
601 let mut updates = Vec::new();
602
603 for op in ops {
604 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
605 oracle_write_ts,
606 session,
607 op,
608 &temporary_ids,
609 audit_events,
610 tx,
611 &*preliminary_state,
612 &mut storage_collections_to_create,
613 &mut storage_collections_to_drop,
614 &mut storage_collections_to_register,
615 )
616 .await?;
617
618 if let Some(builtin_table_update) = weird_builtin_table_update {
627 builtin_table_updates.push(builtin_table_update);
628 }
629
630 let upper = tx.upper();
635 let temporary_item_updates =
636 temporary_item_updates
637 .into_iter()
638 .map(|(item, diff)| StateUpdate {
639 kind: StateUpdateKind::TemporaryItem(item),
640 ts: upper,
641 diff,
642 });
643
644 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
645 op_updates.extend(temporary_item_updates);
646 if !op_updates.is_empty() {
647 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
650 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
651 .to_mut()
652 .apply_updates(op_updates.clone(), &mut local_expr_cache)
653 .await;
654 }
655 updates.append(&mut op_updates);
656 }
657
658 if !updates.is_empty() {
659 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
660 let (op_builtin_table_updates, op_catalog_updates) = state
661 .to_mut()
662 .apply_updates(updates.clone(), &mut local_expr_cache)
663 .await;
664 let op_builtin_table_updates = state
665 .to_mut()
666 .resolve_builtin_table_updates(op_builtin_table_updates);
667 builtin_table_updates.extend(op_builtin_table_updates);
668 parsed_catalog_updates.extend(op_catalog_updates);
669 }
670
671 if dry_run_ops.is_empty() {
672 if let Some(c) = storage_collections {
674 c.prepare_state(
675 tx,
676 storage_collections_to_create,
677 storage_collections_to_drop,
678 storage_collections_to_register,
679 )
680 .await?;
681 }
682
683 let updates = tx.get_and_commit_op_updates();
684 if !updates.is_empty() {
685 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
686 let (op_builtin_table_updates, op_catalog_updates) = state
687 .to_mut()
688 .apply_updates(updates.clone(), &mut local_expr_cache)
689 .await;
690 let op_builtin_table_updates = state
691 .to_mut()
692 .resolve_builtin_table_updates(op_builtin_table_updates);
693 builtin_table_updates.extend(op_builtin_table_updates);
694 parsed_catalog_updates.extend(op_catalog_updates);
695 }
696
697 match state {
698 Cow::Owned(state) => Ok(Some(state)),
699 Cow::Borrowed(_) => Ok(None),
700 }
701 } else {
702 Err(AdapterError::TransactionDryRun {
703 new_ops: dry_run_ops,
704 new_state: state.into_owned(),
705 })
706 }
707 }
708
709 #[instrument]
717 async fn transact_op(
718 oracle_write_ts: mz_repr::Timestamp,
719 session: Option<&ConnMeta>,
720 op: Op,
721 temporary_ids: &BTreeSet<CatalogItemId>,
722 audit_events: &mut Vec<VersionedEvent>,
723 tx: &mut Transaction<'_>,
724 state: &CatalogState,
725 storage_collections_to_create: &mut BTreeSet<GlobalId>,
726 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
727 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
728 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
729 let mut weird_builtin_table_update = None;
730 let mut temporary_item_updates = Vec::new();
731
732 match op {
733 Op::TransactionDryRun => {
734 unreachable!("TransactionDryRun can only be used a final element of ops")
735 }
736 Op::AlterRetainHistory { id, value, window } => {
737 let entry = state.get_entry(&id);
738 if id.is_system() {
739 let name = entry.name();
740 let full_name =
741 state.resolve_full_name(name, session.map(|session| session.conn_id()));
742 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
743 full_name.to_string(),
744 ))));
745 }
746
747 let mut new_entry = entry.clone();
748 let previous = new_entry
749 .item
750 .update_retain_history(value.clone(), window)
751 .map_err(|_| {
752 AdapterError::Catalog(Error::new(ErrorKind::Internal(
753 "planner should have rejected invalid alter retain history item type"
754 .to_string(),
755 )))
756 })?;
757
758 if Self::should_audit_log_item(new_entry.item()) {
759 let details =
760 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
761 id: id.to_string(),
762 old_history: previous.map(|previous| previous.to_string()),
763 new_history: value.map(|v| v.to_string()),
764 });
765 CatalogState::add_to_audit_log(
766 &state.system_configuration,
767 oracle_write_ts,
768 session,
769 tx,
770 audit_events,
771 EventType::Alter,
772 catalog_type_to_audit_object_type(new_entry.item().typ()),
773 details,
774 )?;
775 }
776
777 tx.update_item(id, new_entry.into())?;
778
779 Self::log_update(state, &id);
780 }
781 Op::AlterRole {
782 id,
783 name,
784 attributes,
785 nopassword,
786 vars,
787 } => {
788 state.ensure_not_reserved_role(&id)?;
789
790 let mut existing_role = state.get_role(&id).clone();
791 let password = attributes.password.clone();
792 let scram_iterations = attributes
793 .scram_iterations
794 .unwrap_or_else(|| state.system_config().scram_iterations());
795 existing_role.attributes = attributes.into();
796 existing_role.vars = vars;
797 let password_action = if nopassword {
798 PasswordAction::Clear
799 } else if let Some(password) = password {
800 PasswordAction::Set(PasswordConfig {
801 password,
802 scram_iterations,
803 })
804 } else {
805 PasswordAction::NoChange
806 };
807 tx.update_role(id, existing_role.into(), password_action)?;
808
809 CatalogState::add_to_audit_log(
810 &state.system_configuration,
811 oracle_write_ts,
812 session,
813 tx,
814 audit_events,
815 EventType::Alter,
816 ObjectType::Role,
817 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
818 id: id.to_string(),
819 name: name.clone(),
820 }),
821 )?;
822
823 info!("update role {name} ({id})");
824 }
825 Op::AlterNetworkPolicy {
826 id,
827 rules,
828 name,
829 owner_id: _owner_id,
830 } => {
831 let existing_policy = state.get_network_policy(&id).clone();
832 let mut policy: NetworkPolicy = existing_policy.into();
833 policy.rules = rules;
834 if is_reserved_name(&name) {
835 return Err(AdapterError::Catalog(Error::new(
836 ErrorKind::ReservedNetworkPolicyName(name),
837 )));
838 }
839 tx.update_network_policy(id, policy.clone())?;
840
841 CatalogState::add_to_audit_log(
842 &state.system_configuration,
843 oracle_write_ts,
844 session,
845 tx,
846 audit_events,
847 EventType::Alter,
848 ObjectType::NetworkPolicy,
849 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
850 id: id.to_string(),
851 name: name.clone(),
852 }),
853 )?;
854
855 info!("update network policy {name} ({id})");
856 }
857 Op::AlterAddColumn {
858 id,
859 new_global_id,
860 name,
861 typ,
862 sql,
863 } => {
864 let mut new_entry = state.get_entry(&id).clone();
865 let version = new_entry.item.add_column(name, typ, sql)?;
866 let shard_id = state
869 .storage_metadata()
870 .get_collection_shard(new_entry.latest_global_id())?;
871
872 let CatalogItem::Table(table) = &mut new_entry.item else {
874 return Err(AdapterError::Unsupported("adding columns to non-Table"));
875 };
876 table.collections.insert(version, new_global_id);
877
878 tx.update_item(id, new_entry.into())?;
879 storage_collections_to_register.insert(new_global_id, shard_id);
880 }
881 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
882 let mut new_entry = state.get_entry(&id).clone();
883 let replacement = state.get_entry(&replacement_id);
884
885 let new_audit_events =
886 apply_replacement_audit_events(state, &new_entry, replacement);
887
888 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
889 return Err(AdapterError::internal(
890 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
891 "id must refer to a materialized view",
892 ));
893 };
894 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
895 return Err(AdapterError::internal(
896 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
897 "replacement_id must refer to a materialized view",
898 ));
899 };
900
901 mv.apply_replacement(replacement_mv.clone());
902
903 tx.remove_item(replacement_id)?;
904
905 new_entry.id = replacement_id;
906 tx_replace_item(tx, state, id, new_entry)?;
907
908 let comment_id = CommentObjectId::MaterializedView(replacement_id);
909 tx.drop_comments(&[comment_id].into())?;
910
911 for (event_type, details) in new_audit_events {
912 CatalogState::add_to_audit_log(
913 &state.system_configuration,
914 oracle_write_ts,
915 session,
916 tx,
917 audit_events,
918 event_type,
919 ObjectType::MaterializedView,
920 details,
921 )?;
922 }
923 }
924 Op::CreateDatabase { name, owner_id } => {
925 let database_owner_privileges = vec![rbac::owner_privilege(
926 mz_sql::catalog::ObjectType::Database,
927 owner_id,
928 )];
929 let database_default_privileges = state
930 .default_privileges
931 .get_applicable_privileges(
932 owner_id,
933 None,
934 None,
935 mz_sql::catalog::ObjectType::Database,
936 )
937 .map(|item| item.mz_acl_item(owner_id));
938 let database_privileges: Vec<_> = merge_mz_acl_items(
939 database_owner_privileges
940 .into_iter()
941 .chain(database_default_privileges),
942 )
943 .collect();
944
945 let schema_owner_privileges = vec![rbac::owner_privilege(
946 mz_sql::catalog::ObjectType::Schema,
947 owner_id,
948 )];
949 let schema_default_privileges = state
950 .default_privileges
951 .get_applicable_privileges(
952 owner_id,
953 None,
954 None,
955 mz_sql::catalog::ObjectType::Schema,
956 )
957 .map(|item| item.mz_acl_item(owner_id))
958 .chain(std::iter::once(MzAclItem {
960 grantee: RoleId::Public,
961 grantor: owner_id,
962 acl_mode: AclMode::USAGE,
963 }));
964 let schema_privileges: Vec<_> = merge_mz_acl_items(
965 schema_owner_privileges
966 .into_iter()
967 .chain(schema_default_privileges),
968 )
969 .collect();
970
971 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
972 let (database_id, _) = tx.insert_user_database(
973 &name,
974 owner_id,
975 database_privileges.clone(),
976 &temporary_oids,
977 )?;
978 let (schema_id, _) = tx.insert_user_schema(
979 database_id,
980 DEFAULT_SCHEMA,
981 owner_id,
982 schema_privileges.clone(),
983 &temporary_oids,
984 )?;
985 CatalogState::add_to_audit_log(
986 &state.system_configuration,
987 oracle_write_ts,
988 session,
989 tx,
990 audit_events,
991 EventType::Create,
992 ObjectType::Database,
993 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
994 id: database_id.to_string(),
995 name: name.clone(),
996 }),
997 )?;
998 info!("create database {}", name);
999
1000 CatalogState::add_to_audit_log(
1001 &state.system_configuration,
1002 oracle_write_ts,
1003 session,
1004 tx,
1005 audit_events,
1006 EventType::Create,
1007 ObjectType::Schema,
1008 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1009 id: schema_id.to_string(),
1010 name: DEFAULT_SCHEMA.to_string(),
1011 database_name: Some(name),
1012 }),
1013 )?;
1014 }
1015 Op::CreateSchema {
1016 database_id,
1017 schema_name,
1018 owner_id,
1019 } => {
1020 if is_reserved_name(&schema_name) {
1021 return Err(AdapterError::Catalog(Error::new(
1022 ErrorKind::ReservedSchemaName(schema_name),
1023 )));
1024 }
1025 let database_id = match database_id {
1026 ResolvedDatabaseSpecifier::Id(id) => id,
1027 ResolvedDatabaseSpecifier::Ambient => {
1028 return Err(AdapterError::Catalog(Error::new(
1029 ErrorKind::ReadOnlySystemSchema(schema_name),
1030 )));
1031 }
1032 };
1033 let owner_privileges = vec![rbac::owner_privilege(
1034 mz_sql::catalog::ObjectType::Schema,
1035 owner_id,
1036 )];
1037 let default_privileges = state
1038 .default_privileges
1039 .get_applicable_privileges(
1040 owner_id,
1041 Some(database_id),
1042 None,
1043 mz_sql::catalog::ObjectType::Schema,
1044 )
1045 .map(|item| item.mz_acl_item(owner_id));
1046 let privileges: Vec<_> =
1047 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1048 .collect();
1049 let (schema_id, _) = tx.insert_user_schema(
1050 database_id,
1051 &schema_name,
1052 owner_id,
1053 privileges.clone(),
1054 &state.get_temporary_oids().collect(),
1055 )?;
1056 CatalogState::add_to_audit_log(
1057 &state.system_configuration,
1058 oracle_write_ts,
1059 session,
1060 tx,
1061 audit_events,
1062 EventType::Create,
1063 ObjectType::Schema,
1064 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1065 id: schema_id.to_string(),
1066 name: schema_name.clone(),
1067 database_name: Some(state.database_by_id[&database_id].name.clone()),
1068 }),
1069 )?;
1070 }
1071 Op::CreateRole { name, attributes } => {
1072 if is_reserved_role_name(&name) {
1073 return Err(AdapterError::Catalog(Error::new(
1074 ErrorKind::ReservedRoleName(name),
1075 )));
1076 }
1077 let membership = RoleMembership::new();
1078 let vars = RoleVars::default();
1079 let (id, _) = tx.insert_user_role(
1080 name.clone(),
1081 attributes.clone(),
1082 membership.clone(),
1083 vars.clone(),
1084 &state.get_temporary_oids().collect(),
1085 )?;
1086 CatalogState::add_to_audit_log(
1087 &state.system_configuration,
1088 oracle_write_ts,
1089 session,
1090 tx,
1091 audit_events,
1092 EventType::Create,
1093 ObjectType::Role,
1094 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1095 id: id.to_string(),
1096 name: name.clone(),
1097 }),
1098 )?;
1099 info!("create role {}", name);
1100 }
1101 Op::CreateCluster {
1102 id,
1103 name,
1104 introspection_sources,
1105 owner_id,
1106 config,
1107 } => {
1108 if is_reserved_name(&name) {
1109 return Err(AdapterError::Catalog(Error::new(
1110 ErrorKind::ReservedClusterName(name),
1111 )));
1112 }
1113 let owner_privileges = vec![rbac::owner_privilege(
1114 mz_sql::catalog::ObjectType::Cluster,
1115 owner_id,
1116 )];
1117 let default_privileges = state
1118 .default_privileges
1119 .get_applicable_privileges(
1120 owner_id,
1121 None,
1122 None,
1123 mz_sql::catalog::ObjectType::Cluster,
1124 )
1125 .map(|item| item.mz_acl_item(owner_id));
1126 let privileges: Vec<_> =
1127 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1128 .collect();
1129 let introspection_source_ids: Vec<_> = introspection_sources
1130 .iter()
1131 .map(|introspection_source| {
1132 Transaction::allocate_introspection_source_index_id(
1133 &id,
1134 introspection_source.variant,
1135 )
1136 })
1137 .collect();
1138
1139 let introspection_sources = introspection_sources
1140 .into_iter()
1141 .zip_eq(introspection_source_ids)
1142 .map(|(log, (item_id, gid))| (log, item_id, gid))
1143 .collect();
1144
1145 tx.insert_user_cluster(
1146 id,
1147 &name,
1148 introspection_sources,
1149 owner_id,
1150 privileges.clone(),
1151 config.clone().into(),
1152 &state.get_temporary_oids().collect(),
1153 )?;
1154 CatalogState::add_to_audit_log(
1155 &state.system_configuration,
1156 oracle_write_ts,
1157 session,
1158 tx,
1159 audit_events,
1160 EventType::Create,
1161 ObjectType::Cluster,
1162 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1163 id: id.to_string(),
1164 name: name.clone(),
1165 }),
1166 )?;
1167 info!("create cluster {}", name);
1168 }
1169 Op::CreateClusterReplica {
1170 cluster_id,
1171 name,
1172 config,
1173 owner_id,
1174 reason,
1175 } => {
1176 if is_reserved_name(&name) {
1177 return Err(AdapterError::Catalog(Error::new(
1178 ErrorKind::ReservedReplicaName(name),
1179 )));
1180 }
1181 let cluster = state.get_cluster(cluster_id);
1182 let id =
1183 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1184 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1185 size,
1186 billed_as,
1187 internal,
1188 ..
1189 }) = &config.location
1190 {
1191 let (reason, scheduling_policies) = reason.into_audit_log();
1192 let details = EventDetails::CreateClusterReplicaV4(
1193 mz_audit_log::CreateClusterReplicaV4 {
1194 cluster_id: cluster_id.to_string(),
1195 cluster_name: cluster.name.clone(),
1196 replica_id: Some(id.to_string()),
1197 replica_name: name.clone(),
1198 logical_size: size.clone(),
1199 billed_as: billed_as.clone(),
1200 internal: *internal,
1201 reason,
1202 scheduling_policies,
1203 },
1204 );
1205 CatalogState::add_to_audit_log(
1206 &state.system_configuration,
1207 oracle_write_ts,
1208 session,
1209 tx,
1210 audit_events,
1211 EventType::Create,
1212 ObjectType::ClusterReplica,
1213 details,
1214 )?;
1215 }
1216 }
1217 Op::CreateItem {
1218 id,
1219 name,
1220 item,
1221 owner_id,
1222 } => {
1223 state.check_unstable_dependencies(&item)?;
1224
1225 match &item {
1226 CatalogItem::Table(table) => {
1227 let gids: Vec<_> = table.global_ids().collect();
1228 assert_eq!(gids.len(), 1);
1229 storage_collections_to_create.extend(gids);
1230 }
1231 CatalogItem::Source(source) => {
1232 storage_collections_to_create.insert(source.global_id());
1233 }
1234 CatalogItem::MaterializedView(mv) => {
1235 let mv_gid = mv.global_id_writes();
1236 if let Some(target_id) = mv.replacement_target {
1237 let target_gid = state.get_entry(&target_id).latest_global_id();
1238 let shard_id =
1239 state.storage_metadata().get_collection_shard(target_gid)?;
1240 storage_collections_to_register.insert(mv_gid, shard_id);
1241 } else {
1242 storage_collections_to_create.insert(mv_gid);
1243 }
1244 }
1245 CatalogItem::ContinualTask(ct) => {
1246 storage_collections_to_create.insert(ct.global_id());
1247 }
1248 CatalogItem::Sink(sink) => {
1249 storage_collections_to_create.insert(sink.global_id());
1250 }
1251 CatalogItem::Log(_)
1252 | CatalogItem::View(_)
1253 | CatalogItem::Index(_)
1254 | CatalogItem::Type(_)
1255 | CatalogItem::Func(_)
1256 | CatalogItem::Secret(_)
1257 | CatalogItem::Connection(_) => (),
1258 }
1259
1260 let system_user = session.map_or(false, |s| s.user().is_system_user());
1261 if !system_user {
1262 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1263 let cluster_name = state.clusters_by_id[&id].name.clone();
1264 return Err(AdapterError::Catalog(Error::new(
1265 ErrorKind::ReadOnlyCluster(cluster_name),
1266 )));
1267 }
1268 }
1269
1270 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1271 let default_privileges = state
1272 .default_privileges
1273 .get_applicable_privileges(
1274 owner_id,
1275 name.qualifiers.database_spec.id(),
1276 Some(name.qualifiers.schema_spec.into()),
1277 item.typ().into(),
1278 )
1279 .map(|item| item.mz_acl_item(owner_id));
1280 let progress_source_privilege = if item.is_progress_source() {
1282 Some(MzAclItem {
1283 grantee: MZ_SUPPORT_ROLE_ID,
1284 grantor: owner_id,
1285 acl_mode: AclMode::SELECT,
1286 })
1287 } else {
1288 None
1289 };
1290 let privileges: Vec<_> = merge_mz_acl_items(
1291 owner_privileges
1292 .into_iter()
1293 .chain(default_privileges)
1294 .chain(progress_source_privilege),
1295 )
1296 .collect();
1297
1298 let temporary_oids = state.get_temporary_oids().collect();
1299
1300 if item.is_temporary() {
1301 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1302 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1303 {
1304 return Err(AdapterError::Catalog(Error::new(
1305 ErrorKind::InvalidTemporarySchema,
1306 )));
1307 }
1308 let oid = tx.allocate_oid(&temporary_oids)?;
1309
1310 let schema_id = name.qualifiers.schema_spec.clone().into();
1311 let item_type = item.typ();
1312 let (create_sql, global_id, versions) = item.to_serialized();
1313
1314 let item = TemporaryItem {
1315 id,
1316 oid,
1317 global_id,
1318 schema_id,
1319 name: name.item.clone(),
1320 create_sql,
1321 conn_id: item.conn_id().cloned(),
1322 owner_id,
1323 privileges: privileges.clone(),
1324 extra_versions: versions,
1325 };
1326 temporary_item_updates.push((item, StateDiff::Addition));
1327
1328 info!(
1329 "create temporary {} {} ({})",
1330 item_type,
1331 state.resolve_full_name(&name, None),
1332 id
1333 );
1334 } else {
1335 if let Some(temp_id) =
1336 item.uses()
1337 .iter()
1338 .find(|id| match state.try_get_entry(*id) {
1339 Some(entry) => entry.item().is_temporary(),
1340 None => temporary_ids.contains(id),
1341 })
1342 {
1343 let temp_item = state.get_entry(temp_id);
1344 return Err(AdapterError::Catalog(Error::new(
1345 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1346 )));
1347 }
1348 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1349 && !system_user
1350 {
1351 let schema_name = state
1352 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1353 .schema;
1354 return Err(AdapterError::Catalog(Error::new(
1355 ErrorKind::ReadOnlySystemSchema(schema_name),
1356 )));
1357 }
1358 let schema_id = name.qualifiers.schema_spec.clone().into();
1359 let item_type = item.typ();
1360 let (create_sql, global_id, versions) = item.to_serialized();
1361 tx.insert_user_item(
1362 id,
1363 global_id,
1364 schema_id,
1365 &name.item,
1366 create_sql,
1367 owner_id,
1368 privileges.clone(),
1369 &temporary_oids,
1370 versions,
1371 )?;
1372 info!(
1373 "create {} {} ({})",
1374 item_type,
1375 state.resolve_full_name(&name, None),
1376 id
1377 );
1378 }
1379
1380 if Self::should_audit_log_item(&item) {
1381 let name = Self::full_name_detail(
1382 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1383 );
1384 let details = match &item {
1385 CatalogItem::Source(s) => {
1386 let cluster_id = match s.data_source {
1387 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1390 match state.get_entry(&ingestion_id).cluster_id() {
1391 Some(cluster_id) => Some(cluster_id.to_string()),
1392 None => None,
1393 }
1394 }
1395 _ => match item.cluster_id() {
1396 Some(cluster_id) => Some(cluster_id.to_string()),
1397 None => None,
1398 },
1399 };
1400
1401 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1402 id: id.to_string(),
1403 cluster_id,
1404 name,
1405 external_type: s.source_type().to_string(),
1406 })
1407 }
1408 CatalogItem::Sink(s) => {
1409 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1410 id: id.to_string(),
1411 cluster_id: Some(s.cluster_id.to_string()),
1412 name,
1413 external_type: s.sink_type().to_string(),
1414 })
1415 }
1416 CatalogItem::Index(i) => {
1417 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1418 id: id.to_string(),
1419 name,
1420 cluster_id: i.cluster_id.to_string(),
1421 })
1422 }
1423 CatalogItem::MaterializedView(mv) => {
1424 EventDetails::CreateMaterializedViewV1(
1425 mz_audit_log::CreateMaterializedViewV1 {
1426 id: id.to_string(),
1427 name,
1428 cluster_id: mv.cluster_id.to_string(),
1429 replacement_target_id: mv
1430 .replacement_target
1431 .map(|id| id.to_string()),
1432 },
1433 )
1434 }
1435 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1436 id: id.to_string(),
1437 name,
1438 }),
1439 };
1440 CatalogState::add_to_audit_log(
1441 &state.system_configuration,
1442 oracle_write_ts,
1443 session,
1444 tx,
1445 audit_events,
1446 EventType::Create,
1447 catalog_type_to_audit_object_type(item.typ()),
1448 details,
1449 )?;
1450 }
1451 }
1452 Op::CreateNetworkPolicy {
1453 rules,
1454 name,
1455 owner_id,
1456 } => {
1457 if state.network_policies_by_name.contains_key(&name) {
1458 return Err(AdapterError::PlanError(PlanError::Catalog(
1459 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1460 )));
1461 }
1462 if is_reserved_name(&name) {
1463 return Err(AdapterError::Catalog(Error::new(
1464 ErrorKind::ReservedNetworkPolicyName(name),
1465 )));
1466 }
1467
1468 let owner_privileges = vec![rbac::owner_privilege(
1469 mz_sql::catalog::ObjectType::NetworkPolicy,
1470 owner_id,
1471 )];
1472 let default_privileges = state
1473 .default_privileges
1474 .get_applicable_privileges(
1475 owner_id,
1476 None,
1477 None,
1478 mz_sql::catalog::ObjectType::NetworkPolicy,
1479 )
1480 .map(|item| item.mz_acl_item(owner_id));
1481 let privileges: Vec<_> =
1482 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1483 .collect();
1484
1485 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1486 let id = tx.insert_user_network_policy(
1487 name.clone(),
1488 rules,
1489 privileges,
1490 owner_id,
1491 &temporary_oids,
1492 )?;
1493
1494 CatalogState::add_to_audit_log(
1495 &state.system_configuration,
1496 oracle_write_ts,
1497 session,
1498 tx,
1499 audit_events,
1500 EventType::Create,
1501 ObjectType::NetworkPolicy,
1502 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1503 id: id.to_string(),
1504 name: name.clone(),
1505 }),
1506 )?;
1507
1508 info!("created network policy {name} ({id})");
1509 }
1510 Op::Comment {
1511 object_id,
1512 sub_component,
1513 comment,
1514 } => {
1515 tx.update_comment(object_id, sub_component, comment)?;
1516 let entry = state.get_comment_id_entry(&object_id);
1517 let should_log = entry
1518 .map(|entry| Self::should_audit_log_item(entry.item()))
1519 .unwrap_or(true);
1521 if let (Some(conn_id), true) =
1524 (session.map(|session| session.conn_id()), should_log)
1525 {
1526 CatalogState::add_to_audit_log(
1527 &state.system_configuration,
1528 oracle_write_ts,
1529 session,
1530 tx,
1531 audit_events,
1532 EventType::Comment,
1533 comment_id_to_audit_object_type(object_id),
1534 EventDetails::IdNameV1(IdNameV1 {
1535 id: format!("{object_id:?}"),
1537 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1538 }),
1539 )?;
1540 }
1541 }
1542 Op::UpdateSourceReferences {
1543 source_id,
1544 references,
1545 } => {
1546 tx.update_source_references(
1547 source_id,
1548 references
1549 .references
1550 .into_iter()
1551 .map(|reference| reference.into())
1552 .collect(),
1553 references.updated_at,
1554 )?;
1555 }
1556 Op::DropObjects(drop_object_infos) => {
1557 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1559
1560 tx.drop_comments(&delta.comments)?;
1562
1563 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1565 delta
1566 .items
1567 .iter()
1568 .map(|id| id)
1569 .partition(|id| !state.get_entry(*id).item().is_temporary());
1570 tx.remove_items(&durable_items_to_drop)?;
1571 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1572 let entry = state.get_entry(&id);
1573 (entry.clone().into(), StateDiff::Retraction)
1574 }));
1575
1576 for item_id in delta.items {
1577 let entry = state.get_entry(&item_id);
1578
1579 if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1583 {
1584 storage_collections_to_drop.extend(entry.global_ids());
1585 }
1586
1587 if state.source_references.contains_key(&item_id) {
1588 tx.remove_source_references(item_id)?;
1589 }
1590
1591 if Self::should_audit_log_item(entry.item()) {
1592 CatalogState::add_to_audit_log(
1593 &state.system_configuration,
1594 oracle_write_ts,
1595 session,
1596 tx,
1597 audit_events,
1598 EventType::Drop,
1599 catalog_type_to_audit_object_type(entry.item().typ()),
1600 EventDetails::IdFullNameV1(IdFullNameV1 {
1601 id: item_id.to_string(),
1602 name: Self::full_name_detail(&state.resolve_full_name(
1603 entry.name(),
1604 session.map(|session| session.conn_id()),
1605 )),
1606 }),
1607 )?;
1608 }
1609 info!(
1610 "drop {} {} ({})",
1611 entry.item_type(),
1612 state.resolve_full_name(entry.name(), entry.conn_id()),
1613 item_id
1614 );
1615 }
1616
1617 let schemas = delta
1619 .schemas
1620 .iter()
1621 .map(|(schema_spec, database_spec)| {
1622 (SchemaId::from(schema_spec), *database_spec)
1623 })
1624 .collect();
1625 tx.remove_schemas(&schemas)?;
1626
1627 for (schema_spec, database_spec) in delta.schemas {
1628 let schema = state.get_schema(
1629 &database_spec,
1630 &schema_spec,
1631 session
1632 .map(|session| session.conn_id())
1633 .unwrap_or(&SYSTEM_CONN_ID),
1634 );
1635
1636 let schema_id = SchemaId::from(schema_spec);
1637 let database_id = match database_spec {
1638 ResolvedDatabaseSpecifier::Ambient => None,
1639 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1640 };
1641
1642 CatalogState::add_to_audit_log(
1643 &state.system_configuration,
1644 oracle_write_ts,
1645 session,
1646 tx,
1647 audit_events,
1648 EventType::Drop,
1649 ObjectType::Schema,
1650 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1651 id: schema_id.to_string(),
1652 name: schema.name.schema.to_string(),
1653 database_name: database_id
1654 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1655 }),
1656 )?;
1657 }
1658
1659 tx.remove_databases(&delta.databases)?;
1661
1662 for database_id in delta.databases {
1663 let database = state.get_database(&database_id).clone();
1664
1665 CatalogState::add_to_audit_log(
1666 &state.system_configuration,
1667 oracle_write_ts,
1668 session,
1669 tx,
1670 audit_events,
1671 EventType::Drop,
1672 ObjectType::Database,
1673 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1674 id: database_id.to_string(),
1675 name: database.name.clone(),
1676 }),
1677 )?;
1678 }
1679
1680 tx.remove_user_roles(&delta.roles)?;
1682
1683 for role_id in delta.roles {
1684 let role = state
1685 .roles_by_id
1686 .get(&role_id)
1687 .expect("catalog out of sync");
1688
1689 CatalogState::add_to_audit_log(
1690 &state.system_configuration,
1691 oracle_write_ts,
1692 session,
1693 tx,
1694 audit_events,
1695 EventType::Drop,
1696 ObjectType::Role,
1697 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1698 id: role.id.to_string(),
1699 name: role.name.clone(),
1700 }),
1701 )?;
1702 info!("drop role {}", role.name());
1703 }
1704
1705 tx.remove_network_policies(&delta.network_policies)?;
1707
1708 for network_policy_id in delta.network_policies {
1709 let policy = state
1710 .network_policies_by_id
1711 .get(&network_policy_id)
1712 .expect("catalog out of sync");
1713
1714 CatalogState::add_to_audit_log(
1715 &state.system_configuration,
1716 oracle_write_ts,
1717 session,
1718 tx,
1719 audit_events,
1720 EventType::Drop,
1721 ObjectType::NetworkPolicy,
1722 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1723 id: policy.id.to_string(),
1724 name: policy.name.clone(),
1725 }),
1726 )?;
1727 info!("drop network policy {}", policy.name.clone());
1728 }
1729
1730 let replicas = delta.replicas.keys().copied().collect();
1732 tx.remove_cluster_replicas(&replicas)?;
1733
1734 for (replica_id, (cluster_id, reason)) in delta.replicas {
1735 let cluster = state.get_cluster(cluster_id);
1736 let replica = cluster.replica(replica_id).expect("Must exist");
1737
1738 let (reason, scheduling_policies) = reason.into_audit_log();
1739 let details =
1740 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1741 cluster_id: cluster_id.to_string(),
1742 cluster_name: cluster.name.clone(),
1743 replica_id: Some(replica_id.to_string()),
1744 replica_name: replica.name.clone(),
1745 reason,
1746 scheduling_policies,
1747 });
1748 CatalogState::add_to_audit_log(
1749 &state.system_configuration,
1750 oracle_write_ts,
1751 session,
1752 tx,
1753 audit_events,
1754 EventType::Drop,
1755 ObjectType::ClusterReplica,
1756 details,
1757 )?;
1758 }
1759
1760 tx.remove_clusters(&delta.clusters)?;
1762
1763 for cluster_id in delta.clusters {
1764 let cluster = state.get_cluster(cluster_id);
1765
1766 CatalogState::add_to_audit_log(
1767 &state.system_configuration,
1768 oracle_write_ts,
1769 session,
1770 tx,
1771 audit_events,
1772 EventType::Drop,
1773 ObjectType::Cluster,
1774 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1775 id: cluster.id.to_string(),
1776 name: cluster.name.clone(),
1777 }),
1778 )?;
1779 }
1780 }
1781 Op::GrantRole {
1782 role_id,
1783 member_id,
1784 grantor_id,
1785 } => {
1786 state.ensure_not_reserved_role(&member_id)?;
1787 state.ensure_grantable_role(&role_id)?;
1788 if state.collect_role_membership(&role_id).contains(&member_id) {
1789 let group_role = state.get_role(&role_id);
1790 let member_role = state.get_role(&member_id);
1791 return Err(AdapterError::Catalog(Error::new(
1792 ErrorKind::CircularRoleMembership {
1793 role_name: group_role.name().to_string(),
1794 member_name: member_role.name().to_string(),
1795 },
1796 )));
1797 }
1798 let mut member_role = state.get_role(&member_id).clone();
1799 member_role.membership.map.insert(role_id, grantor_id);
1800 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1801
1802 CatalogState::add_to_audit_log(
1803 &state.system_configuration,
1804 oracle_write_ts,
1805 session,
1806 tx,
1807 audit_events,
1808 EventType::Grant,
1809 ObjectType::Role,
1810 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1811 role_id: role_id.to_string(),
1812 member_id: member_id.to_string(),
1813 grantor_id: grantor_id.to_string(),
1814 executed_by: session
1815 .map(|session| session.authenticated_role_id())
1816 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1817 .to_string(),
1818 }),
1819 )?;
1820 }
1821 Op::RevokeRole {
1822 role_id,
1823 member_id,
1824 grantor_id,
1825 } => {
1826 state.ensure_not_reserved_role(&member_id)?;
1827 state.ensure_grantable_role(&role_id)?;
1828 let mut member_role = state.get_role(&member_id).clone();
1829 member_role.membership.map.remove(&role_id);
1830 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1831
1832 CatalogState::add_to_audit_log(
1833 &state.system_configuration,
1834 oracle_write_ts,
1835 session,
1836 tx,
1837 audit_events,
1838 EventType::Revoke,
1839 ObjectType::Role,
1840 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1841 role_id: role_id.to_string(),
1842 member_id: member_id.to_string(),
1843 grantor_id: grantor_id.to_string(),
1844 executed_by: session
1845 .map(|session| session.authenticated_role_id())
1846 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1847 .to_string(),
1848 }),
1849 )?;
1850 }
1851 Op::UpdatePrivilege {
1852 target_id,
1853 privilege,
1854 variant,
1855 } => {
1856 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1857 UpdatePrivilegeVariant::Grant => {
1858 privileges.grant(privilege);
1859 }
1860 UpdatePrivilegeVariant::Revoke => {
1861 privileges.revoke(&privilege);
1862 }
1863 };
1864 match &target_id {
1865 SystemObjectId::Object(object_id) => match object_id {
1866 ObjectId::Cluster(id) => {
1867 let mut cluster = state.get_cluster(*id).clone();
1868 update_privilege_fn(&mut cluster.privileges);
1869 tx.update_cluster(*id, cluster.into())?;
1870 }
1871 ObjectId::Database(id) => {
1872 let mut database = state.get_database(id).clone();
1873 update_privilege_fn(&mut database.privileges);
1874 tx.update_database(*id, database.into())?;
1875 }
1876 ObjectId::NetworkPolicy(id) => {
1877 let mut policy = state.get_network_policy(id).clone();
1878 update_privilege_fn(&mut policy.privileges);
1879 tx.update_network_policy(*id, policy.into())?;
1880 }
1881 ObjectId::Schema((database_spec, schema_spec)) => {
1882 let schema_id = schema_spec.clone().into();
1883 let mut schema = state
1884 .get_schema(
1885 database_spec,
1886 schema_spec,
1887 session
1888 .map(|session| session.conn_id())
1889 .unwrap_or(&SYSTEM_CONN_ID),
1890 )
1891 .clone();
1892 update_privilege_fn(&mut schema.privileges);
1893 tx.update_schema(schema_id, schema.into())?;
1894 }
1895 ObjectId::Item(id) => {
1896 let entry = state.get_entry(id);
1897 let mut new_entry = entry.clone();
1898 update_privilege_fn(&mut new_entry.privileges);
1899 if !new_entry.item().is_temporary() {
1900 tx.update_item(*id, new_entry.into())?;
1901 } else {
1902 temporary_item_updates
1903 .push((entry.clone().into(), StateDiff::Retraction));
1904 temporary_item_updates
1905 .push((new_entry.into(), StateDiff::Addition));
1906 }
1907 }
1908 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1909 },
1910 SystemObjectId::System => {
1911 let mut system_privileges = state.system_privileges.clone();
1912 update_privilege_fn(&mut system_privileges);
1913 let new_privilege =
1914 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1915 tx.set_system_privilege(
1916 privilege.grantee,
1917 privilege.grantor,
1918 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1919 )?;
1920 }
1921 }
1922 let object_type = state.get_system_object_type(&target_id);
1923 let object_id_str = match &target_id {
1924 SystemObjectId::System => "SYSTEM".to_string(),
1925 SystemObjectId::Object(id) => id.to_string(),
1926 };
1927 CatalogState::add_to_audit_log(
1928 &state.system_configuration,
1929 oracle_write_ts,
1930 session,
1931 tx,
1932 audit_events,
1933 variant.into(),
1934 system_object_type_to_audit_object_type(&object_type),
1935 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1936 object_id: object_id_str,
1937 grantee_id: privilege.grantee.to_string(),
1938 grantor_id: privilege.grantor.to_string(),
1939 privileges: privilege.acl_mode.to_string(),
1940 }),
1941 )?;
1942 }
1943 Op::UpdateDefaultPrivilege {
1944 privilege_object,
1945 privilege_acl_item,
1946 variant,
1947 } => {
1948 let mut default_privileges = state.default_privileges.clone();
1949 match variant {
1950 UpdatePrivilegeVariant::Grant => default_privileges
1951 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1952 UpdatePrivilegeVariant::Revoke => {
1953 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1954 }
1955 }
1956 let new_acl_mode = default_privileges
1957 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1958 tx.set_default_privilege(
1959 privilege_object.role_id,
1960 privilege_object.database_id,
1961 privilege_object.schema_id,
1962 privilege_object.object_type,
1963 privilege_acl_item.grantee,
1964 new_acl_mode.cloned(),
1965 )?;
1966 CatalogState::add_to_audit_log(
1967 &state.system_configuration,
1968 oracle_write_ts,
1969 session,
1970 tx,
1971 audit_events,
1972 variant.into(),
1973 object_type_to_audit_object_type(privilege_object.object_type),
1974 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1975 role_id: privilege_object.role_id.to_string(),
1976 database_id: privilege_object.database_id.map(|id| id.to_string()),
1977 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1978 grantee_id: privilege_acl_item.grantee.to_string(),
1979 privileges: privilege_acl_item.acl_mode.to_string(),
1980 }),
1981 )?;
1982 }
1983 Op::RenameCluster {
1984 id,
1985 name,
1986 to_name,
1987 check_reserved_names,
1988 } => {
1989 if id.is_system() {
1990 return Err(AdapterError::Catalog(Error::new(
1991 ErrorKind::ReadOnlyCluster(name.clone()),
1992 )));
1993 }
1994 if check_reserved_names && is_reserved_name(&to_name) {
1995 return Err(AdapterError::Catalog(Error::new(
1996 ErrorKind::ReservedClusterName(to_name),
1997 )));
1998 }
1999 tx.rename_cluster(id, &name, &to_name)?;
2000 CatalogState::add_to_audit_log(
2001 &state.system_configuration,
2002 oracle_write_ts,
2003 session,
2004 tx,
2005 audit_events,
2006 EventType::Alter,
2007 ObjectType::Cluster,
2008 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2009 id: id.to_string(),
2010 old_name: name.clone(),
2011 new_name: to_name.clone(),
2012 }),
2013 )?;
2014 info!("rename cluster {name} to {to_name}");
2015 }
2016 Op::RenameClusterReplica {
2017 cluster_id,
2018 replica_id,
2019 name,
2020 to_name,
2021 } => {
2022 if is_reserved_name(&to_name) {
2023 return Err(AdapterError::Catalog(Error::new(
2024 ErrorKind::ReservedReplicaName(to_name),
2025 )));
2026 }
2027 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2028 CatalogState::add_to_audit_log(
2029 &state.system_configuration,
2030 oracle_write_ts,
2031 session,
2032 tx,
2033 audit_events,
2034 EventType::Alter,
2035 ObjectType::ClusterReplica,
2036 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2037 cluster_id: cluster_id.to_string(),
2038 replica_id: replica_id.to_string(),
2039 old_name: name.replica.as_str().to_string(),
2040 new_name: to_name.clone(),
2041 }),
2042 )?;
2043 info!("rename cluster replica {name} to {to_name}");
2044 }
2045 Op::RenameItem {
2046 id,
2047 to_name,
2048 current_full_name,
2049 } => {
2050 let mut updates = Vec::new();
2051
2052 let entry = state.get_entry(&id);
2053 if let CatalogItem::Type(_) = entry.item() {
2054 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2055 current_full_name.to_string(),
2056 ))));
2057 }
2058
2059 if entry.id().is_system() {
2060 let name = state
2061 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2062 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2063 name.to_string(),
2064 ))));
2065 }
2066
2067 let mut to_full_name = current_full_name.clone();
2068 to_full_name.item.clone_from(&to_name);
2069
2070 let mut to_qualified_name = entry.name().clone();
2071 to_qualified_name.item.clone_from(&to_name);
2072
2073 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2074 id: id.to_string(),
2075 old_name: Self::full_name_detail(¤t_full_name),
2076 new_name: Self::full_name_detail(&to_full_name),
2077 });
2078 if Self::should_audit_log_item(entry.item()) {
2079 CatalogState::add_to_audit_log(
2080 &state.system_configuration,
2081 oracle_write_ts,
2082 session,
2083 tx,
2084 audit_events,
2085 EventType::Alter,
2086 catalog_type_to_audit_object_type(entry.item().typ()),
2087 details,
2088 )?;
2089 }
2090
2091 let mut new_entry = entry.clone();
2093 new_entry.name.item.clone_from(&to_name);
2094 new_entry.item = entry
2095 .item()
2096 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2097 .map_err(|e| {
2098 Error::new(ErrorKind::from(AmbiguousRename {
2099 depender: state
2100 .resolve_full_name(entry.name(), entry.conn_id())
2101 .to_string(),
2102 dependee: state
2103 .resolve_full_name(entry.name(), entry.conn_id())
2104 .to_string(),
2105 message: e,
2106 }))
2107 })?;
2108
2109 for id in entry.referenced_by() {
2110 let dependent_item = state.get_entry(id);
2111 let mut to_entry = dependent_item.clone();
2112 to_entry.item = dependent_item
2113 .item()
2114 .rename_item_refs(
2115 current_full_name.clone(),
2116 to_full_name.item.clone(),
2117 false,
2118 )
2119 .map_err(|e| {
2120 Error::new(ErrorKind::from(AmbiguousRename {
2121 depender: state
2122 .resolve_full_name(
2123 dependent_item.name(),
2124 dependent_item.conn_id(),
2125 )
2126 .to_string(),
2127 dependee: state
2128 .resolve_full_name(entry.name(), entry.conn_id())
2129 .to_string(),
2130 message: e,
2131 }))
2132 })?;
2133
2134 if !to_entry.item().is_temporary() {
2135 tx.update_item(*id, to_entry.into())?;
2136 } else {
2137 temporary_item_updates
2138 .push((dependent_item.clone().into(), StateDiff::Retraction));
2139 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2140 }
2141 updates.push(*id);
2142 }
2143 if !new_entry.item().is_temporary() {
2144 tx.update_item(id, new_entry.into())?;
2145 } else {
2146 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2147 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2148 }
2149
2150 updates.push(id);
2151 for id in updates {
2152 Self::log_update(state, &id);
2153 }
2154 }
2155 Op::RenameSchema {
2156 database_spec,
2157 schema_spec,
2158 new_name,
2159 check_reserved_names,
2160 } => {
2161 if check_reserved_names && is_reserved_name(&new_name) {
2162 return Err(AdapterError::Catalog(Error::new(
2163 ErrorKind::ReservedSchemaName(new_name),
2164 )));
2165 }
2166
2167 let conn_id = session
2168 .map(|session| session.conn_id())
2169 .unwrap_or(&SYSTEM_CONN_ID);
2170
2171 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2172 let cur_name = schema.name().schema.clone();
2173
2174 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2175 return Err(AdapterError::Catalog(Error::new(
2176 ErrorKind::AmbientSchemaRename(cur_name),
2177 )));
2178 };
2179 let database = state.get_database(&database_id);
2180 let database_name = &database.name;
2181
2182 let mut updates = Vec::new();
2183 let mut items_to_update = BTreeMap::new();
2184
2185 let mut update_item = |id| {
2186 if items_to_update.contains_key(id) {
2187 return Ok(());
2188 }
2189
2190 let entry = state.get_entry(id);
2191
2192 let mut new_entry = entry.clone();
2194 new_entry.item = entry
2195 .item
2196 .rename_schema_refs(database_name, &cur_name, &new_name)
2197 .map_err(|(s, _i)| {
2198 Error::new(ErrorKind::from(AmbiguousRename {
2199 depender: state
2200 .resolve_full_name(entry.name(), entry.conn_id())
2201 .to_string(),
2202 dependee: format!("{database_name}.{cur_name}"),
2203 message: format!("ambiguous reference to schema named {s}"),
2204 }))
2205 })?;
2206
2207 if !new_entry.item().is_temporary() {
2209 items_to_update.insert(*id, new_entry.into());
2210 } else {
2211 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2212 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2213 }
2214 updates.push(id);
2215
2216 Ok::<_, AdapterError>(())
2217 };
2218
2219 for (_name, item_id) in &schema.items {
2221 update_item(item_id)?;
2223
2224 for id in state.get_entry(item_id).referenced_by() {
2226 update_item(id)?;
2227 }
2228 }
2229 tx.update_items(items_to_update)?;
2232
2233 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2235 let schema_name = schema.name().schema.clone();
2236 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2237 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2238 )));
2239 };
2240
2241 let database_name = database_spec
2243 .id()
2244 .map(|id| state.get_database(&id).name.clone());
2245 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2246 id: schema_id.to_string(),
2247 old_name: schema.name().schema.clone(),
2248 new_name: new_name.clone(),
2249 database_name,
2250 });
2251 CatalogState::add_to_audit_log(
2252 &state.system_configuration,
2253 oracle_write_ts,
2254 session,
2255 tx,
2256 audit_events,
2257 EventType::Alter,
2258 mz_audit_log::ObjectType::Schema,
2259 details,
2260 )?;
2261
2262 let mut new_schema = schema.clone();
2264 new_schema.name.schema.clone_from(&new_name);
2265 tx.update_schema(schema_id, new_schema.into())?;
2266
2267 for id in updates {
2268 Self::log_update(state, id);
2269 }
2270 }
2271 Op::UpdateOwner { id, new_owner } => {
2272 let conn_id = session
2273 .map(|session| session.conn_id())
2274 .unwrap_or(&SYSTEM_CONN_ID);
2275 let old_owner = state
2276 .get_owner_id(&id, conn_id)
2277 .expect("cannot update the owner of an object without an owner");
2278 match &id {
2279 ObjectId::Cluster(id) => {
2280 let mut cluster = state.get_cluster(*id).clone();
2281 if id.is_system() {
2282 return Err(AdapterError::Catalog(Error::new(
2283 ErrorKind::ReadOnlyCluster(cluster.name),
2284 )));
2285 }
2286 Self::update_privilege_owners(
2287 &mut cluster.privileges,
2288 cluster.owner_id,
2289 new_owner,
2290 );
2291 cluster.owner_id = new_owner;
2292 tx.update_cluster(*id, cluster.into())?;
2293 }
2294 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2295 let cluster = state.get_cluster(*cluster_id);
2296 let mut replica = cluster
2297 .replica(*replica_id)
2298 .expect("catalog out of sync")
2299 .clone();
2300 if replica_id.is_system() {
2301 return Err(AdapterError::Catalog(Error::new(
2302 ErrorKind::ReadOnlyClusterReplica(replica.name),
2303 )));
2304 }
2305 replica.owner_id = new_owner;
2306 tx.update_cluster_replica(*replica_id, replica.into())?;
2307 }
2308 ObjectId::Database(id) => {
2309 let mut database = state.get_database(id).clone();
2310 if id.is_system() {
2311 return Err(AdapterError::Catalog(Error::new(
2312 ErrorKind::ReadOnlyDatabase(database.name),
2313 )));
2314 }
2315 Self::update_privilege_owners(
2316 &mut database.privileges,
2317 database.owner_id,
2318 new_owner,
2319 );
2320 database.owner_id = new_owner;
2321 tx.update_database(*id, database.clone().into())?;
2322 }
2323 ObjectId::Schema((database_spec, schema_spec)) => {
2324 let schema_id: SchemaId = schema_spec.clone().into();
2325 let mut schema = state
2326 .get_schema(database_spec, schema_spec, conn_id)
2327 .clone();
2328 if schema_id.is_system() {
2329 let name = schema.name();
2330 let full_name = state.resolve_full_schema_name(name);
2331 return Err(AdapterError::Catalog(Error::new(
2332 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2333 )));
2334 }
2335 Self::update_privilege_owners(
2336 &mut schema.privileges,
2337 schema.owner_id,
2338 new_owner,
2339 );
2340 schema.owner_id = new_owner;
2341 tx.update_schema(schema_id, schema.into())?;
2342 }
2343 ObjectId::Item(id) => {
2344 let entry = state.get_entry(id);
2345 let mut new_entry = entry.clone();
2346 if id.is_system() {
2347 let full_name = state.resolve_full_name(
2348 new_entry.name(),
2349 session.map(|session| session.conn_id()),
2350 );
2351 return Err(AdapterError::Catalog(Error::new(
2352 ErrorKind::ReadOnlyItem(full_name.to_string()),
2353 )));
2354 }
2355 Self::update_privilege_owners(
2356 &mut new_entry.privileges,
2357 new_entry.owner_id,
2358 new_owner,
2359 );
2360 new_entry.owner_id = new_owner;
2361 if !new_entry.item().is_temporary() {
2362 tx.update_item(*id, new_entry.into())?;
2363 } else {
2364 temporary_item_updates
2365 .push((entry.clone().into(), StateDiff::Retraction));
2366 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2367 }
2368 }
2369 ObjectId::NetworkPolicy(id) => {
2370 let mut policy = state.get_network_policy(id).clone();
2371 if id.is_system() {
2372 return Err(AdapterError::Catalog(Error::new(
2373 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2374 )));
2375 }
2376 Self::update_privilege_owners(
2377 &mut policy.privileges,
2378 policy.owner_id,
2379 new_owner,
2380 );
2381 policy.owner_id = new_owner;
2382 tx.update_network_policy(*id, policy.into())?;
2383 }
2384 ObjectId::Role(_) => unreachable!("roles have no owner"),
2385 }
2386 let object_type = state.get_object_type(&id);
2387 CatalogState::add_to_audit_log(
2388 &state.system_configuration,
2389 oracle_write_ts,
2390 session,
2391 tx,
2392 audit_events,
2393 EventType::Alter,
2394 object_type_to_audit_object_type(object_type),
2395 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2396 object_id: id.to_string(),
2397 old_owner_id: old_owner.to_string(),
2398 new_owner_id: new_owner.to_string(),
2399 }),
2400 )?;
2401 }
2402 Op::UpdateClusterConfig { id, name, config } => {
2403 let mut cluster = state.get_cluster(id).clone();
2404 cluster.config = config;
2405 tx.update_cluster(id, cluster.into())?;
2406 info!("update cluster {}", name);
2407
2408 CatalogState::add_to_audit_log(
2409 &state.system_configuration,
2410 oracle_write_ts,
2411 session,
2412 tx,
2413 audit_events,
2414 EventType::Alter,
2415 ObjectType::Cluster,
2416 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2417 id: id.to_string(),
2418 name,
2419 }),
2420 )?;
2421 }
2422 Op::UpdateClusterReplicaConfig {
2423 replica_id,
2424 cluster_id,
2425 config,
2426 } => {
2427 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2428 info!("update replica {}", replica.name);
2429 tx.update_cluster_replica(
2430 replica_id,
2431 mz_catalog::durable::ClusterReplica {
2432 cluster_id,
2433 replica_id,
2434 name: replica.name.clone(),
2435 config: config.clone().into(),
2436 owner_id: replica.owner_id,
2437 },
2438 )?;
2439 }
2440 Op::UpdateItem { id, name, to_item } => {
2441 let mut entry = state.get_entry(&id).clone();
2442 entry.name = name.clone();
2443 entry.item = to_item.clone();
2444 tx.update_item(id, entry.into())?;
2445
2446 if Self::should_audit_log_item(&to_item) {
2447 let mut full_name = Self::full_name_detail(
2448 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2449 );
2450 full_name.item = name.item;
2451
2452 CatalogState::add_to_audit_log(
2453 &state.system_configuration,
2454 oracle_write_ts,
2455 session,
2456 tx,
2457 audit_events,
2458 EventType::Alter,
2459 catalog_type_to_audit_object_type(to_item.typ()),
2460 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2461 id: id.to_string(),
2462 name: full_name,
2463 }),
2464 )?;
2465 }
2466
2467 Self::log_update(state, &id);
2468 }
2469 Op::UpdateSystemConfiguration { name, value } => {
2470 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2471 tx.upsert_system_config(&name, parsed_value.clone())?;
2472 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2477 let with_0dt_deployment_max_wait =
2478 Duration::parse(VarInput::Flat(&parsed_value))
2479 .expect("parsing succeeded above");
2480 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2481 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2482 let with_0dt_deployment_ddl_check_interval =
2483 Duration::parse(VarInput::Flat(&parsed_value))
2484 .expect("parsing succeeded above");
2485 tx.set_0dt_deployment_ddl_check_interval(
2486 with_0dt_deployment_ddl_check_interval,
2487 )?;
2488 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2489 let panic_after_timeout =
2490 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2491 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2492 }
2493
2494 CatalogState::add_to_audit_log(
2495 &state.system_configuration,
2496 oracle_write_ts,
2497 session,
2498 tx,
2499 audit_events,
2500 EventType::Alter,
2501 ObjectType::System,
2502 EventDetails::SetV1(mz_audit_log::SetV1 {
2503 name,
2504 value: Some(value.borrow().to_vec().join(", ")),
2505 }),
2506 )?;
2507 }
2508 Op::ResetSystemConfiguration { name } => {
2509 tx.remove_system_config(&name);
2510 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2515 tx.reset_0dt_deployment_max_wait()?;
2516 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2517 tx.reset_0dt_deployment_ddl_check_interval()?;
2518 }
2519
2520 CatalogState::add_to_audit_log(
2521 &state.system_configuration,
2522 oracle_write_ts,
2523 session,
2524 tx,
2525 audit_events,
2526 EventType::Alter,
2527 ObjectType::System,
2528 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2529 )?;
2530 }
2531 Op::ResetAllSystemConfiguration => {
2532 tx.clear_system_configs();
2533 tx.reset_0dt_deployment_max_wait()?;
2534 tx.reset_0dt_deployment_ddl_check_interval()?;
2535
2536 CatalogState::add_to_audit_log(
2537 &state.system_configuration,
2538 oracle_write_ts,
2539 session,
2540 tx,
2541 audit_events,
2542 EventType::Alter,
2543 ObjectType::System,
2544 EventDetails::ResetAllV1,
2545 )?;
2546 }
2547 Op::WeirdStorageUsageUpdates {
2548 object_id,
2549 size_bytes,
2550 collection_timestamp,
2551 } => {
2552 let id = tx.allocate_storage_usage_ids()?;
2553 let metric =
2554 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2555 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2556 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2557 weird_builtin_table_update = Some(builtin_table_update);
2558 }
2559 };
2560 Ok((weird_builtin_table_update, temporary_item_updates))
2561 }
2562
2563 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2564 let entry = state.get_entry(id);
2565 info!(
2566 "update {} {} ({})",
2567 entry.item_type(),
2568 state.resolve_full_name(entry.name(), entry.conn_id()),
2569 id
2570 );
2571 }
2572
2573 fn update_privilege_owners(
2577 privileges: &mut PrivilegeMap,
2578 old_owner: RoleId,
2579 new_owner: RoleId,
2580 ) {
2581 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2583
2584 let mut new_present = false;
2585 for privilege in flat_privileges.iter_mut() {
2586 if privilege.grantor == old_owner {
2589 privilege.grantor = new_owner;
2590 } else if privilege.grantor == new_owner {
2591 new_present = true;
2592 }
2593 if privilege.grantee == old_owner {
2595 privilege.grantee = new_owner;
2596 } else if privilege.grantee == new_owner {
2597 new_present = true;
2598 }
2599 }
2600
2601 if new_present {
2605 let privilege_map: BTreeMap<_, Vec<_>> =
2607 flat_privileges
2608 .into_iter()
2609 .fold(BTreeMap::new(), |mut accum, privilege| {
2610 accum
2611 .entry((privilege.grantee, privilege.grantor))
2612 .or_default()
2613 .push(privilege);
2614 accum
2615 });
2616
2617 flat_privileges = privilege_map
2619 .into_iter()
2620 .map(|((grantee, grantor), values)|
2621 values.into_iter().fold(
2623 MzAclItem::empty(grantee, grantor),
2624 |mut accum, mz_aclitem| {
2625 accum.acl_mode =
2626 accum.acl_mode.union(mz_aclitem.acl_mode);
2627 accum
2628 },
2629 ))
2630 .collect();
2631 }
2632
2633 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2634 }
2635}
2636
2637fn tx_replace_item(
2646 tx: &mut Transaction<'_>,
2647 state: &CatalogState,
2648 id: CatalogItemId,
2649 new_entry: CatalogEntry,
2650) -> Result<(), AdapterError> {
2651 let new_id = new_entry.id;
2652
2653 for use_id in new_entry.referenced_by() {
2655 if tx.get_item(use_id).is_none() {
2657 continue;
2658 }
2659
2660 let mut dependent = state.get_entry(use_id).clone();
2661 dependent.item = dependent.item.replace_item_refs(id, new_id);
2662 tx.update_item(*use_id, dependent.into())?;
2663 }
2664
2665 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2667 let new_comment_id = new_entry.comment_object_id();
2668 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2669 tx.drop_comments(&[old_comment_id].into())?;
2670 for (sub, comment) in comments {
2671 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2672 }
2673 }
2674
2675 let mz_catalog::durable::Item {
2676 id: _,
2677 oid,
2678 global_id,
2679 schema_id,
2680 name,
2681 create_sql,
2682 owner_id,
2683 privileges,
2684 extra_versions,
2685 } = new_entry.into();
2686
2687 tx.remove_item(id)?;
2688 tx.insert_item(
2689 new_id,
2690 oid,
2691 global_id,
2692 schema_id,
2693 &name,
2694 create_sql,
2695 owner_id,
2696 privileges,
2697 extra_versions,
2698 )?;
2699
2700 Ok(())
2701}
2702
2703fn apply_replacement_audit_events(
2705 state: &CatalogState,
2706 target: &CatalogEntry,
2707 replacement: &CatalogEntry,
2708) -> Vec<(EventType, EventDetails)> {
2709 let mut events = Vec::new();
2710
2711 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2712 let target_id_name = IdFullNameV1 {
2713 id: target.id().to_string(),
2714 name: Catalog::full_name_detail(&target_name),
2715 };
2716 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2717 let replacement_id_name = IdFullNameV1 {
2718 id: replacement.id().to_string(),
2719 name: Catalog::full_name_detail(&replacement_name),
2720 };
2721
2722 if Catalog::should_audit_log_item(&replacement.item) {
2723 events.push((
2724 EventType::Drop,
2725 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2726 ));
2727 }
2728
2729 if Catalog::should_audit_log_item(&target.item) {
2730 events.push((
2731 EventType::Alter,
2732 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2733 target: target_id_name.clone(),
2734 replacement: replacement_id_name,
2735 }),
2736 ));
2737
2738 if let Some(old_cluster_id) = target.cluster_id()
2739 && let Some(new_cluster_id) = replacement.cluster_id()
2740 && old_cluster_id != new_cluster_id
2741 {
2742 events.push((
2745 EventType::Alter,
2746 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2747 id: replacement.id().to_string(),
2748 name: target_id_name.name,
2749 old_cluster_id: old_cluster_id.to_string(),
2750 new_cluster_id: new_cluster_id.to_string(),
2751 }),
2752 ));
2753 }
2754 }
2755
2756 events
2757}
2758
2759#[derive(Debug, Default)]
2767pub(crate) struct ObjectsToDrop {
2768 pub comments: BTreeSet<CommentObjectId>,
2769 pub databases: BTreeSet<DatabaseId>,
2770 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2771 pub clusters: BTreeSet<ClusterId>,
2772 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2773 pub roles: BTreeSet<RoleId>,
2774 pub items: Vec<CatalogItemId>,
2775 pub network_policies: BTreeSet<NetworkPolicyId>,
2776}
2777
2778impl ObjectsToDrop {
2779 pub fn generate(
2780 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2781 state: &CatalogState,
2782 session: Option<&ConnMeta>,
2783 ) -> Result<Self, AdapterError> {
2784 let mut delta = ObjectsToDrop::default();
2785
2786 for drop_object_info in drop_object_infos {
2787 delta.add_item(drop_object_info, state, session)?;
2788 }
2789
2790 Ok(delta)
2791 }
2792
2793 fn add_item(
2794 &mut self,
2795 drop_object_info: DropObjectInfo,
2796 state: &CatalogState,
2797 session: Option<&ConnMeta>,
2798 ) -> Result<(), AdapterError> {
2799 self.comments
2800 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2801
2802 match drop_object_info {
2803 DropObjectInfo::Database(database_id) => {
2804 let database = &state.database_by_id[&database_id];
2805 if database_id.is_system() {
2806 return Err(AdapterError::Catalog(Error::new(
2807 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2808 )));
2809 }
2810
2811 self.databases.insert(database_id);
2812 }
2813 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2814 let schema = state.get_schema(
2815 &database_spec,
2816 &schema_spec,
2817 session
2818 .map(|session| session.conn_id())
2819 .unwrap_or(&SYSTEM_CONN_ID),
2820 );
2821 let schema_id: SchemaId = schema_spec.into();
2822 if schema_id.is_system() {
2823 let name = schema.name();
2824 let full_name = state.resolve_full_schema_name(name);
2825 return Err(AdapterError::Catalog(Error::new(
2826 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2827 )));
2828 }
2829
2830 self.schemas.insert(schema_spec, database_spec);
2831 }
2832 DropObjectInfo::Role(role_id) => {
2833 let name = state.get_role(&role_id).name().to_string();
2834 if role_id.is_system() || role_id.is_predefined() {
2835 return Err(AdapterError::Catalog(Error::new(
2836 ErrorKind::ReservedRoleName(name.clone()),
2837 )));
2838 }
2839 state.ensure_not_reserved_role(&role_id)?;
2840
2841 self.roles.insert(role_id);
2842 }
2843 DropObjectInfo::Cluster(cluster_id) => {
2844 let cluster = state.get_cluster(cluster_id);
2845 let name = &cluster.name;
2846 if cluster_id.is_system() {
2847 return Err(AdapterError::Catalog(Error::new(
2848 ErrorKind::ReadOnlyCluster(name.clone()),
2849 )));
2850 }
2851
2852 self.clusters.insert(cluster_id);
2853 }
2854 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2855 let cluster = state.get_cluster(cluster_id);
2856 let replica = cluster.replica(replica_id).expect("Must exist");
2857
2858 self.replicas
2859 .insert(replica.replica_id, (cluster.id, reason));
2860 }
2861 DropObjectInfo::Item(item_id) => {
2862 let entry = state.get_entry(&item_id);
2863 if item_id.is_system() {
2864 let name = entry.name();
2865 let full_name =
2866 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2867 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2868 full_name.to_string(),
2869 ))));
2870 }
2871
2872 self.items.push(item_id);
2873 }
2874 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2875 let policy = state.get_network_policy(&network_policy_id);
2876 let name = &policy.name;
2877 if network_policy_id.is_system() {
2878 return Err(AdapterError::Catalog(Error::new(
2879 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2880 )));
2881 }
2882
2883 self.network_policies.insert(network_policy_id);
2884 }
2885 }
2886
2887 Ok(())
2888 }
2889}
2890
2891#[cfg(test)]
2892mod tests {
2893 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2894 use mz_repr::role_id::RoleId;
2895
2896 use crate::catalog::Catalog;
2897
2898 #[mz_ore::test]
2899 fn test_update_privilege_owners() {
2900 let old_owner = RoleId::User(1);
2901 let new_owner = RoleId::User(2);
2902 let other_role = RoleId::User(3);
2903
2904 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2906 MzAclItem {
2907 grantee: other_role,
2908 grantor: old_owner,
2909 acl_mode: AclMode::UPDATE,
2910 },
2911 MzAclItem {
2912 grantee: other_role,
2913 grantor: new_owner,
2914 acl_mode: AclMode::SELECT,
2915 },
2916 ]);
2917 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2918 assert_eq!(1, privileges.all_values().count());
2919 assert_eq!(
2920 vec![MzAclItem {
2921 grantee: other_role,
2922 grantor: new_owner,
2923 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2924 }],
2925 privileges.all_values_owned().collect::<Vec<_>>()
2926 );
2927
2928 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2930 MzAclItem {
2931 grantee: old_owner,
2932 grantor: other_role,
2933 acl_mode: AclMode::UPDATE,
2934 },
2935 MzAclItem {
2936 grantee: new_owner,
2937 grantor: other_role,
2938 acl_mode: AclMode::SELECT,
2939 },
2940 ]);
2941 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2942 assert_eq!(1, privileges.all_values().count());
2943 assert_eq!(
2944 vec![MzAclItem {
2945 grantee: new_owner,
2946 grantor: other_role,
2947 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2948 }],
2949 privileges.all_values_owned().collect::<Vec<_>>()
2950 );
2951
2952 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2954 MzAclItem {
2955 grantee: old_owner,
2956 grantor: old_owner,
2957 acl_mode: AclMode::UPDATE,
2958 },
2959 MzAclItem {
2960 grantee: new_owner,
2961 grantor: new_owner,
2962 acl_mode: AclMode::SELECT,
2963 },
2964 ]);
2965 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2966 assert_eq!(1, privileges.all_values().count());
2967 assert_eq!(
2968 vec![MzAclItem {
2969 grantee: new_owner,
2970 grantor: new_owner,
2971 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2972 }],
2973 privileges.all_values_owned().collect::<Vec<_>>()
2974 );
2975 }
2976}