1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff,
35 StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52 RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72 is_reserved_role_name, object_type_to_audit_object_type,
73 system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82 AlterRetainHistory {
83 id: CatalogItemId,
84 value: Option<Value>,
85 window: CompactionWindow,
86 },
87 AlterRole {
88 id: RoleId,
89 name: String,
90 attributes: RoleAttributesRaw,
91 nopassword: bool,
92 vars: RoleVars,
93 },
94 AlterNetworkPolicy {
95 id: NetworkPolicyId,
96 rules: Vec<NetworkPolicyRule>,
97 name: String,
98 owner_id: RoleId,
99 },
100 AlterAddColumn {
101 id: CatalogItemId,
102 new_global_id: GlobalId,
103 name: ColumnName,
104 typ: SqlColumnType,
105 sql: RawDataType,
106 },
107 AlterMaterializedViewApplyReplacement {
108 id: CatalogItemId,
109 replacement_id: CatalogItemId,
110 },
111 CreateDatabase {
112 name: String,
113 owner_id: RoleId,
114 },
115 CreateSchema {
116 database_id: ResolvedDatabaseSpecifier,
117 schema_name: String,
118 owner_id: RoleId,
119 },
120 CreateRole {
121 name: String,
122 attributes: RoleAttributesRaw,
123 },
124 CreateCluster {
125 id: ClusterId,
126 name: String,
127 introspection_sources: Vec<&'static BuiltinLog>,
128 owner_id: RoleId,
129 config: ClusterConfig,
130 },
131 CreateClusterReplica {
132 cluster_id: ClusterId,
133 name: String,
134 config: ReplicaConfig,
135 owner_id: RoleId,
136 reason: ReplicaCreateDropReason,
137 },
138 CreateItem {
139 id: CatalogItemId,
140 name: QualifiedItemName,
141 item: CatalogItem,
142 owner_id: RoleId,
143 },
144 CreateNetworkPolicy {
145 rules: Vec<NetworkPolicyRule>,
146 name: String,
147 owner_id: RoleId,
148 },
149 Comment {
150 object_id: CommentObjectId,
151 sub_component: Option<usize>,
152 comment: Option<String>,
153 },
154 DropObjects(Vec<DropObjectInfo>),
155 GrantRole {
156 role_id: RoleId,
157 member_id: RoleId,
158 grantor_id: RoleId,
159 },
160 RenameCluster {
161 id: ClusterId,
162 name: String,
163 to_name: String,
164 check_reserved_names: bool,
165 },
166 RenameClusterReplica {
167 cluster_id: ClusterId,
168 replica_id: ReplicaId,
169 name: QualifiedReplica,
170 to_name: String,
171 },
172 RenameItem {
173 id: CatalogItemId,
174 current_full_name: FullItemName,
175 to_name: String,
176 },
177 RenameSchema {
178 database_spec: ResolvedDatabaseSpecifier,
179 schema_spec: SchemaSpecifier,
180 new_name: String,
181 check_reserved_names: bool,
182 },
183 UpdateOwner {
184 id: ObjectId,
185 new_owner: RoleId,
186 },
187 UpdatePrivilege {
188 target_id: SystemObjectId,
189 privilege: MzAclItem,
190 variant: UpdatePrivilegeVariant,
191 },
192 UpdateDefaultPrivilege {
193 privilege_object: DefaultPrivilegeObject,
194 privilege_acl_item: DefaultPrivilegeAclItem,
195 variant: UpdatePrivilegeVariant,
196 },
197 RevokeRole {
198 role_id: RoleId,
199 member_id: RoleId,
200 grantor_id: RoleId,
201 },
202 UpdateClusterConfig {
203 id: ClusterId,
204 name: String,
205 config: ClusterConfig,
206 },
207 UpdateClusterReplicaConfig {
208 cluster_id: ClusterId,
209 replica_id: ReplicaId,
210 config: ReplicaConfig,
211 },
212 UpdateItem {
213 id: CatalogItemId,
214 name: QualifiedItemName,
215 to_item: CatalogItem,
216 },
217 UpdateSourceReferences {
218 source_id: CatalogItemId,
219 references: SourceReferences,
220 },
221 UpdateSystemConfiguration {
222 name: String,
223 value: OwnedVarInput,
224 },
225 ResetSystemConfiguration {
226 name: String,
227 },
228 ResetAllSystemConfiguration,
229 WeirdStorageUsageUpdates {
236 object_id: Option<String>,
237 size_bytes: u64,
238 collection_timestamp: EpochMillis,
239 },
240 TransactionDryRun,
246}
247
248#[derive(Debug, Clone)]
252pub enum DropObjectInfo {
253 Cluster(ClusterId),
254 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
255 Database(DatabaseId),
256 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
257 Role(RoleId),
258 Item(CatalogItemId),
259 NetworkPolicy(NetworkPolicyId),
260}
261
262impl DropObjectInfo {
263 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
266 match id {
267 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
268 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
269 cluster_id,
270 replica_id,
271 ReplicaCreateDropReason::Manual,
272 )),
273 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
274 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
275 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
276 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
277 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
278 }
279 }
280
281 fn to_object_id(&self) -> ObjectId {
284 match &self {
285 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
286 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
287 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
288 }
289 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
290 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
291 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
292 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
293 DropObjectInfo::NetworkPolicy(network_policy_id) => {
294 ObjectId::NetworkPolicy(network_policy_id.clone())
295 }
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
302pub enum ReplicaCreateDropReason {
303 Manual,
308 ClusterScheduling(Vec<SchedulingDecision>),
311}
312
313impl ReplicaCreateDropReason {
314 pub fn into_audit_log(
315 self,
316 ) -> (
317 CreateOrDropClusterReplicaReasonV1,
318 Option<SchedulingDecisionsWithReasonsV2>,
319 ) {
320 let (reason, scheduling_policies) = match self {
321 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
322 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
323 CreateOrDropClusterReplicaReasonV1::Schedule,
324 Some(scheduling_decisions),
325 ),
326 };
327 (
328 reason,
329 scheduling_policies
330 .as_ref()
331 .map(SchedulingDecision::reasons_to_audit_log_reasons),
332 )
333 }
334}
335
336pub struct TransactionResult {
337 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
338 pub catalog_updates: Vec<ParsedStateUpdate>,
340 pub audit_events: Vec<VersionedEvent>,
341}
342
343impl Catalog {
344 fn should_audit_log_item(item: &CatalogItem) -> bool {
345 !item.is_temporary()
346 }
347
348 fn temporary_ids(
351 &self,
352 ops: &[Op],
353 temporary_drops: BTreeSet<(&ConnectionId, String)>,
354 ) -> Result<BTreeSet<CatalogItemId>, Error> {
355 let mut creating = BTreeSet::new();
356 let mut temporary_ids = BTreeSet::new();
357 for op in ops.iter() {
358 if let Op::CreateItem {
359 id,
360 name,
361 item,
362 owner_id: _,
363 } = op
364 {
365 if let Some(conn_id) = item.conn_id() {
366 if self.item_exists_in_temp_schemas(conn_id, &name.item)
367 && !temporary_drops.contains(&(conn_id, name.item.clone()))
368 || creating.contains(&(conn_id, &name.item))
369 {
370 return Err(
371 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
372 );
373 } else {
374 creating.insert((conn_id, &name.item));
375 temporary_ids.insert(id.clone());
376 }
377 }
378 }
379 }
380 Ok(temporary_ids)
381 }
382
383 #[instrument(name = "catalog::transact")]
384 pub async fn transact(
385 &mut self,
386 storage_collections: Option<
389 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
390 >,
391 oracle_write_ts: mz_repr::Timestamp,
392 session: Option<&ConnMeta>,
393 ops: Vec<Op>,
394 ) -> Result<TransactionResult, AdapterError> {
395 trace!("transact: {:?}", ops);
396 fail::fail_point!("catalog_transact", |arg| {
397 Err(AdapterError::Unstructured(anyhow::anyhow!(
398 "failpoint: {arg:?}"
399 )))
400 });
401
402 let drop_ids: BTreeSet<CatalogItemId> = ops
403 .iter()
404 .filter_map(|op| match op {
405 Op::DropObjects(drop_object_infos) => {
406 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
407 let item_ids = ids.filter_map(|id| match id {
408 ObjectId::Item(id) => Some(id),
409 _ => None,
410 });
411 Some(item_ids)
412 }
413 _ => None,
414 })
415 .flatten()
416 .collect();
417 let temporary_drops = drop_ids
418 .iter()
419 .filter_map(|id| {
420 let entry = self.get_entry(id);
421 match entry.item().conn_id() {
422 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
423 None => None,
424 }
425 })
426 .collect();
427 let dropped_global_ids = drop_ids
428 .iter()
429 .flat_map(|item_id| self.get_global_ids(item_id))
430 .collect();
431
432 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
433 let mut builtin_table_updates = vec![];
434 let mut catalog_updates = vec![];
435 let mut audit_events = vec![];
436 let mut storage = self.storage().await;
437 let mut tx = storage
438 .transaction()
439 .await
440 .unwrap_or_terminate("starting catalog transaction");
441
442 let new_state = Self::transact_inner(
443 storage_collections,
444 oracle_write_ts,
445 session,
446 ops,
447 temporary_ids,
448 &mut builtin_table_updates,
449 &mut catalog_updates,
450 &mut audit_events,
451 &mut tx,
452 &self.state,
453 )
454 .await?;
455
456 tx.commit(oracle_write_ts)
461 .await
462 .unwrap_or_terminate("catalog storage transaction commit must succeed");
463
464 drop(storage);
467 if let Some(new_state) = new_state {
468 self.transient_revision += 1;
469 self.state = new_state;
470 }
471
472 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
474 if self.state.system_config().enable_mz_notices() {
475 self.state().pack_optimizer_notices(
477 &mut builtin_table_updates,
478 dropped_notices.iter(),
479 Diff::MINUS_ONE,
480 );
481 }
482
483 Ok(TransactionResult {
484 builtin_table_updates,
485 catalog_updates,
486 audit_events,
487 })
488 }
489
490 fn extract_expressions_from_ops(
494 ops: &[Op],
495 optimizer_features: &OptimizerFeatures,
496 ) -> BTreeMap<GlobalId, LocalExpressions> {
497 let mut exprs = BTreeMap::new();
498
499 for op in ops {
500 if let Op::CreateItem { item, .. } = op {
501 match item {
502 CatalogItem::View(view) => {
503 exprs.insert(
504 view.global_id,
505 LocalExpressions {
506 local_mir: (*view.optimized_expr).clone(),
507 optimizer_features: optimizer_features.clone(),
508 },
509 );
510 }
511 CatalogItem::MaterializedView(mv) => {
512 exprs.insert(
513 mv.global_id_writes(),
514 LocalExpressions {
515 local_mir: (*mv.optimized_expr).clone(),
516 optimizer_features: optimizer_features.clone(),
517 },
518 );
519 }
520 _ => {}
521 }
522 }
523 }
524
525 exprs
526 }
527
528 #[instrument(name = "catalog::transact_inner")]
536 async fn transact_inner(
537 storage_collections: Option<
538 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
539 >,
540 oracle_write_ts: mz_repr::Timestamp,
541 session: Option<&ConnMeta>,
542 mut ops: Vec<Op>,
543 temporary_ids: BTreeSet<CatalogItemId>,
544 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
545 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
546 audit_events: &mut Vec<VersionedEvent>,
547 tx: &mut Transaction<'_>,
548 state: &CatalogState,
549 ) -> Result<Option<CatalogState>, AdapterError> {
550 let mut preliminary_state = Cow::Borrowed(state);
577
578 let mut state = Cow::Borrowed(state);
580
581 let dry_run_ops = match ops.last() {
582 Some(Op::TransactionDryRun) => {
583 ops.pop();
585 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
586 ops.clone()
587 }
588 Some(_) => vec![],
589 None => return Ok(None),
590 };
591
592 let optimizer_features = OptimizerFeatures::from(state.system_config());
595 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
596
597 let mut storage_collections_to_create = BTreeSet::new();
598 let mut storage_collections_to_drop = BTreeSet::new();
599 let mut storage_collections_to_register = BTreeMap::new();
600
601 let mut updates = Vec::new();
602
603 for op in ops {
604 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
605 oracle_write_ts,
606 session,
607 op,
608 &temporary_ids,
609 audit_events,
610 tx,
611 &*preliminary_state,
612 &mut storage_collections_to_create,
613 &mut storage_collections_to_drop,
614 &mut storage_collections_to_register,
615 )
616 .await?;
617
618 if let Some(builtin_table_update) = weird_builtin_table_update {
627 builtin_table_updates.push(builtin_table_update);
628 }
629
630 let upper = tx.upper();
635 let temporary_item_updates =
636 temporary_item_updates
637 .into_iter()
638 .map(|(item, diff)| StateUpdate {
639 kind: StateUpdateKind::TemporaryItem(item),
640 ts: upper,
641 diff,
642 });
643
644 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
645 op_updates.extend(temporary_item_updates);
646 if !op_updates.is_empty() {
647 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
650 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
651 .to_mut()
652 .apply_updates(op_updates.clone(), &mut local_expr_cache)
653 .await;
654 }
655 updates.append(&mut op_updates);
656 }
657
658 if !updates.is_empty() {
659 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
660 let (op_builtin_table_updates, op_catalog_updates) = state
661 .to_mut()
662 .apply_updates(updates.clone(), &mut local_expr_cache)
663 .await;
664 let op_builtin_table_updates = state
665 .to_mut()
666 .resolve_builtin_table_updates(op_builtin_table_updates);
667 builtin_table_updates.extend(op_builtin_table_updates);
668 parsed_catalog_updates.extend(op_catalog_updates);
669 }
670
671 if dry_run_ops.is_empty() {
672 if let Some(c) = storage_collections {
674 c.prepare_state(
675 tx,
676 storage_collections_to_create,
677 storage_collections_to_drop,
678 storage_collections_to_register,
679 )
680 .await?;
681 }
682
683 let updates = tx.get_and_commit_op_updates();
684 if !updates.is_empty() {
685 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
686 let (op_builtin_table_updates, op_catalog_updates) = state
687 .to_mut()
688 .apply_updates(updates.clone(), &mut local_expr_cache)
689 .await;
690 let op_builtin_table_updates = state
691 .to_mut()
692 .resolve_builtin_table_updates(op_builtin_table_updates);
693 builtin_table_updates.extend(op_builtin_table_updates);
694 parsed_catalog_updates.extend(op_catalog_updates);
695 }
696
697 match state {
698 Cow::Owned(state) => Ok(Some(state)),
699 Cow::Borrowed(_) => Ok(None),
700 }
701 } else {
702 Err(AdapterError::TransactionDryRun {
703 new_ops: dry_run_ops,
704 new_state: state.into_owned(),
705 })
706 }
707 }
708
709 #[instrument]
717 async fn transact_op(
718 oracle_write_ts: mz_repr::Timestamp,
719 session: Option<&ConnMeta>,
720 op: Op,
721 temporary_ids: &BTreeSet<CatalogItemId>,
722 audit_events: &mut Vec<VersionedEvent>,
723 tx: &mut Transaction<'_>,
724 state: &CatalogState,
725 storage_collections_to_create: &mut BTreeSet<GlobalId>,
726 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
727 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
728 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
729 let mut weird_builtin_table_update = None;
730 let mut temporary_item_updates = Vec::new();
731
732 match op {
733 Op::TransactionDryRun => {
734 unreachable!("TransactionDryRun can only be used a final element of ops")
735 }
736 Op::AlterRetainHistory { id, value, window } => {
737 let entry = state.get_entry(&id);
738 if id.is_system() {
739 let name = entry.name();
740 let full_name =
741 state.resolve_full_name(name, session.map(|session| session.conn_id()));
742 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
743 full_name.to_string(),
744 ))));
745 }
746
747 let mut new_entry = entry.clone();
748 let previous = new_entry
749 .item
750 .update_retain_history(value.clone(), window)
751 .map_err(|_| {
752 AdapterError::Catalog(Error::new(ErrorKind::Internal(
753 "planner should have rejected invalid alter retain history item type"
754 .to_string(),
755 )))
756 })?;
757
758 if Self::should_audit_log_item(new_entry.item()) {
759 let details =
760 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
761 id: id.to_string(),
762 old_history: previous.map(|previous| previous.to_string()),
763 new_history: value.map(|v| v.to_string()),
764 });
765 CatalogState::add_to_audit_log(
766 &state.system_configuration,
767 oracle_write_ts,
768 session,
769 tx,
770 audit_events,
771 EventType::Alter,
772 catalog_type_to_audit_object_type(new_entry.item().typ()),
773 details,
774 )?;
775 }
776
777 tx.update_item(id, new_entry.into())?;
778
779 Self::log_update(state, &id);
780 }
781 Op::AlterRole {
782 id,
783 name,
784 attributes,
785 nopassword,
786 vars,
787 } => {
788 state.ensure_not_reserved_role(&id)?;
789
790 let mut existing_role = state.get_role(&id).clone();
791 let password = attributes.password.clone();
792 let scram_iterations = attributes
793 .scram_iterations
794 .unwrap_or_else(|| state.system_config().scram_iterations());
795 existing_role.attributes = attributes.into();
796 existing_role.vars = vars;
797 let password_action = if nopassword {
798 PasswordAction::Clear
799 } else if let Some(password) = password {
800 PasswordAction::Set(PasswordConfig {
801 password,
802 scram_iterations,
803 })
804 } else {
805 PasswordAction::NoChange
806 };
807 tx.update_role(id, existing_role.into(), password_action)?;
808
809 CatalogState::add_to_audit_log(
810 &state.system_configuration,
811 oracle_write_ts,
812 session,
813 tx,
814 audit_events,
815 EventType::Alter,
816 ObjectType::Role,
817 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
818 id: id.to_string(),
819 name: name.clone(),
820 }),
821 )?;
822
823 info!("update role {name} ({id})");
824 }
825 Op::AlterNetworkPolicy {
826 id,
827 rules,
828 name,
829 owner_id: _owner_id,
830 } => {
831 let existing_policy = state.get_network_policy(&id).clone();
832 let mut policy: NetworkPolicy = existing_policy.into();
833 policy.rules = rules;
834 if is_reserved_name(&name) {
835 return Err(AdapterError::Catalog(Error::new(
836 ErrorKind::ReservedNetworkPolicyName(name),
837 )));
838 }
839 tx.update_network_policy(id, policy.clone())?;
840
841 CatalogState::add_to_audit_log(
842 &state.system_configuration,
843 oracle_write_ts,
844 session,
845 tx,
846 audit_events,
847 EventType::Alter,
848 ObjectType::NetworkPolicy,
849 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
850 id: id.to_string(),
851 name: name.clone(),
852 }),
853 )?;
854
855 info!("update network policy {name} ({id})");
856 }
857 Op::AlterAddColumn {
858 id,
859 new_global_id,
860 name,
861 typ,
862 sql,
863 } => {
864 let mut new_entry = state.get_entry(&id).clone();
865 let version = new_entry.item.add_column(name, typ, sql)?;
866 let shard_id = state
869 .storage_metadata()
870 .get_collection_shard(new_entry.latest_global_id())?;
871
872 let CatalogItem::Table(table) = &mut new_entry.item else {
874 return Err(AdapterError::Unsupported("adding columns to non-Table"));
875 };
876 table.collections.insert(version, new_global_id);
877
878 tx.update_item(id, new_entry.into())?;
879 storage_collections_to_register.insert(new_global_id, shard_id);
880 }
881 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
882 let mut new_entry = state.get_entry(&id).clone();
883 let replacement = state.get_entry(&replacement_id);
884
885 let new_audit_events =
886 apply_replacement_audit_events(state, &new_entry, replacement);
887
888 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
889 return Err(AdapterError::internal(
890 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
891 "id must refer to a materialized view",
892 ));
893 };
894 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
895 return Err(AdapterError::internal(
896 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
897 "replacement_id must refer to a materialized view",
898 ));
899 };
900
901 mv.apply_replacement(replacement_mv.clone());
902
903 tx.remove_item(replacement_id)?;
904
905 new_entry.id = replacement_id;
906 tx_replace_item(tx, state, id, new_entry)?;
907
908 let comment_id = CommentObjectId::MaterializedView(replacement_id);
909 tx.drop_comments(&[comment_id].into())?;
910
911 for (event_type, details) in new_audit_events {
912 CatalogState::add_to_audit_log(
913 &state.system_configuration,
914 oracle_write_ts,
915 session,
916 tx,
917 audit_events,
918 event_type,
919 ObjectType::MaterializedView,
920 details,
921 )?;
922 }
923 }
924 Op::CreateDatabase { name, owner_id } => {
925 let database_owner_privileges = vec![rbac::owner_privilege(
926 mz_sql::catalog::ObjectType::Database,
927 owner_id,
928 )];
929 let database_default_privileges = state
930 .default_privileges
931 .get_applicable_privileges(
932 owner_id,
933 None,
934 None,
935 mz_sql::catalog::ObjectType::Database,
936 )
937 .map(|item| item.mz_acl_item(owner_id));
938 let database_privileges: Vec<_> = merge_mz_acl_items(
939 database_owner_privileges
940 .into_iter()
941 .chain(database_default_privileges),
942 )
943 .collect();
944
945 let schema_owner_privileges = vec![rbac::owner_privilege(
946 mz_sql::catalog::ObjectType::Schema,
947 owner_id,
948 )];
949 let schema_default_privileges = state
950 .default_privileges
951 .get_applicable_privileges(
952 owner_id,
953 None,
954 None,
955 mz_sql::catalog::ObjectType::Schema,
956 )
957 .map(|item| item.mz_acl_item(owner_id))
958 .chain(std::iter::once(MzAclItem {
960 grantee: RoleId::Public,
961 grantor: owner_id,
962 acl_mode: AclMode::USAGE,
963 }));
964 let schema_privileges: Vec<_> = merge_mz_acl_items(
965 schema_owner_privileges
966 .into_iter()
967 .chain(schema_default_privileges),
968 )
969 .collect();
970
971 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
972 let (database_id, _) = tx.insert_user_database(
973 &name,
974 owner_id,
975 database_privileges.clone(),
976 &temporary_oids,
977 )?;
978 let (schema_id, _) = tx.insert_user_schema(
979 database_id,
980 DEFAULT_SCHEMA,
981 owner_id,
982 schema_privileges.clone(),
983 &temporary_oids,
984 )?;
985 CatalogState::add_to_audit_log(
986 &state.system_configuration,
987 oracle_write_ts,
988 session,
989 tx,
990 audit_events,
991 EventType::Create,
992 ObjectType::Database,
993 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
994 id: database_id.to_string(),
995 name: name.clone(),
996 }),
997 )?;
998 info!("create database {}", name);
999
1000 CatalogState::add_to_audit_log(
1001 &state.system_configuration,
1002 oracle_write_ts,
1003 session,
1004 tx,
1005 audit_events,
1006 EventType::Create,
1007 ObjectType::Schema,
1008 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1009 id: schema_id.to_string(),
1010 name: DEFAULT_SCHEMA.to_string(),
1011 database_name: Some(name),
1012 }),
1013 )?;
1014 }
1015 Op::CreateSchema {
1016 database_id,
1017 schema_name,
1018 owner_id,
1019 } => {
1020 if is_reserved_name(&schema_name) {
1021 return Err(AdapterError::Catalog(Error::new(
1022 ErrorKind::ReservedSchemaName(schema_name),
1023 )));
1024 }
1025 let database_id = match database_id {
1026 ResolvedDatabaseSpecifier::Id(id) => id,
1027 ResolvedDatabaseSpecifier::Ambient => {
1028 return Err(AdapterError::Catalog(Error::new(
1029 ErrorKind::ReadOnlySystemSchema(schema_name),
1030 )));
1031 }
1032 };
1033 let owner_privileges = vec![rbac::owner_privilege(
1034 mz_sql::catalog::ObjectType::Schema,
1035 owner_id,
1036 )];
1037 let default_privileges = state
1038 .default_privileges
1039 .get_applicable_privileges(
1040 owner_id,
1041 Some(database_id),
1042 None,
1043 mz_sql::catalog::ObjectType::Schema,
1044 )
1045 .map(|item| item.mz_acl_item(owner_id));
1046 let privileges: Vec<_> =
1047 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1048 .collect();
1049 let (schema_id, _) = tx.insert_user_schema(
1050 database_id,
1051 &schema_name,
1052 owner_id,
1053 privileges.clone(),
1054 &state.get_temporary_oids().collect(),
1055 )?;
1056 CatalogState::add_to_audit_log(
1057 &state.system_configuration,
1058 oracle_write_ts,
1059 session,
1060 tx,
1061 audit_events,
1062 EventType::Create,
1063 ObjectType::Schema,
1064 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1065 id: schema_id.to_string(),
1066 name: schema_name.clone(),
1067 database_name: Some(state.database_by_id[&database_id].name.clone()),
1068 }),
1069 )?;
1070 }
1071 Op::CreateRole { name, attributes } => {
1072 if is_reserved_role_name(&name) {
1073 return Err(AdapterError::Catalog(Error::new(
1074 ErrorKind::ReservedRoleName(name),
1075 )));
1076 }
1077 let membership = RoleMembership::new();
1078 let vars = RoleVars::default();
1079 let (id, _) = tx.insert_user_role(
1080 name.clone(),
1081 attributes.clone(),
1082 membership.clone(),
1083 vars.clone(),
1084 &state.get_temporary_oids().collect(),
1085 )?;
1086 CatalogState::add_to_audit_log(
1087 &state.system_configuration,
1088 oracle_write_ts,
1089 session,
1090 tx,
1091 audit_events,
1092 EventType::Create,
1093 ObjectType::Role,
1094 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1095 id: id.to_string(),
1096 name: name.clone(),
1097 }),
1098 )?;
1099 info!("create role {}", name);
1100 }
1101 Op::CreateCluster {
1102 id,
1103 name,
1104 introspection_sources,
1105 owner_id,
1106 config,
1107 } => {
1108 if is_reserved_name(&name) {
1109 return Err(AdapterError::Catalog(Error::new(
1110 ErrorKind::ReservedClusterName(name),
1111 )));
1112 }
1113 let owner_privileges = vec![rbac::owner_privilege(
1114 mz_sql::catalog::ObjectType::Cluster,
1115 owner_id,
1116 )];
1117 let default_privileges = state
1118 .default_privileges
1119 .get_applicable_privileges(
1120 owner_id,
1121 None,
1122 None,
1123 mz_sql::catalog::ObjectType::Cluster,
1124 )
1125 .map(|item| item.mz_acl_item(owner_id));
1126 let privileges: Vec<_> =
1127 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1128 .collect();
1129 let introspection_source_ids: Vec<_> = introspection_sources
1130 .iter()
1131 .map(|introspection_source| {
1132 Transaction::allocate_introspection_source_index_id(
1133 &id,
1134 introspection_source.variant,
1135 )
1136 })
1137 .collect();
1138
1139 let introspection_sources = introspection_sources
1140 .into_iter()
1141 .zip_eq(introspection_source_ids)
1142 .map(|(log, (item_id, gid))| (log, item_id, gid))
1143 .collect();
1144
1145 tx.insert_user_cluster(
1146 id,
1147 &name,
1148 introspection_sources,
1149 owner_id,
1150 privileges.clone(),
1151 config.clone().into(),
1152 &state.get_temporary_oids().collect(),
1153 )?;
1154 CatalogState::add_to_audit_log(
1155 &state.system_configuration,
1156 oracle_write_ts,
1157 session,
1158 tx,
1159 audit_events,
1160 EventType::Create,
1161 ObjectType::Cluster,
1162 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1163 id: id.to_string(),
1164 name: name.clone(),
1165 }),
1166 )?;
1167 info!("create cluster {}", name);
1168 }
1169 Op::CreateClusterReplica {
1170 cluster_id,
1171 name,
1172 config,
1173 owner_id,
1174 reason,
1175 } => {
1176 if is_reserved_name(&name) {
1177 return Err(AdapterError::Catalog(Error::new(
1178 ErrorKind::ReservedReplicaName(name),
1179 )));
1180 }
1181 let cluster = state.get_cluster(cluster_id);
1182 let id =
1183 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1184 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1185 size,
1186 billed_as,
1187 internal,
1188 ..
1189 }) = &config.location
1190 {
1191 let (reason, scheduling_policies) = reason.into_audit_log();
1192 let details = EventDetails::CreateClusterReplicaV4(
1193 mz_audit_log::CreateClusterReplicaV4 {
1194 cluster_id: cluster_id.to_string(),
1195 cluster_name: cluster.name.clone(),
1196 replica_id: Some(id.to_string()),
1197 replica_name: name.clone(),
1198 logical_size: size.clone(),
1199 billed_as: billed_as.clone(),
1200 internal: *internal,
1201 reason,
1202 scheduling_policies,
1203 },
1204 );
1205 CatalogState::add_to_audit_log(
1206 &state.system_configuration,
1207 oracle_write_ts,
1208 session,
1209 tx,
1210 audit_events,
1211 EventType::Create,
1212 ObjectType::ClusterReplica,
1213 details,
1214 )?;
1215 }
1216 }
1217 Op::CreateItem {
1218 id,
1219 name,
1220 item,
1221 owner_id,
1222 } => {
1223 state.check_unstable_dependencies(&item)?;
1224
1225 match &item {
1226 CatalogItem::Table(table) => {
1227 let gids: Vec<_> = table.global_ids().collect();
1228 assert_eq!(gids.len(), 1);
1229 storage_collections_to_create.extend(gids);
1230 }
1231 CatalogItem::Source(source) => {
1232 storage_collections_to_create.insert(source.global_id());
1233 }
1234 CatalogItem::MaterializedView(mv) => {
1235 let mv_gid = mv.global_id_writes();
1236 if let Some(target_id) = mv.replacement_target {
1237 let target_gid = state.get_entry(&target_id).latest_global_id();
1238 let shard_id =
1239 state.storage_metadata().get_collection_shard(target_gid)?;
1240 storage_collections_to_register.insert(mv_gid, shard_id);
1241 } else {
1242 storage_collections_to_create.insert(mv_gid);
1243 }
1244 }
1245 CatalogItem::ContinualTask(ct) => {
1246 storage_collections_to_create.insert(ct.global_id());
1247 }
1248 CatalogItem::Sink(sink) => {
1249 storage_collections_to_create.insert(sink.global_id());
1250 }
1251 CatalogItem::Log(_)
1252 | CatalogItem::View(_)
1253 | CatalogItem::Index(_)
1254 | CatalogItem::Type(_)
1255 | CatalogItem::Func(_)
1256 | CatalogItem::Secret(_)
1257 | CatalogItem::Connection(_) => (),
1258 }
1259
1260 let system_user = session.map_or(false, |s| s.user().is_system_user());
1261 if !system_user {
1262 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1263 let cluster_name = state.clusters_by_id[&id].name.clone();
1264 return Err(AdapterError::Catalog(Error::new(
1265 ErrorKind::ReadOnlyCluster(cluster_name),
1266 )));
1267 }
1268 }
1269
1270 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1271 let default_privileges = state
1272 .default_privileges
1273 .get_applicable_privileges(
1274 owner_id,
1275 name.qualifiers.database_spec.id(),
1276 Some(name.qualifiers.schema_spec.into()),
1277 item.typ().into(),
1278 )
1279 .map(|item| item.mz_acl_item(owner_id));
1280 let progress_source_privilege = if item.is_progress_source() {
1282 Some(MzAclItem {
1283 grantee: MZ_SUPPORT_ROLE_ID,
1284 grantor: owner_id,
1285 acl_mode: AclMode::SELECT,
1286 })
1287 } else {
1288 None
1289 };
1290 let privileges: Vec<_> = merge_mz_acl_items(
1291 owner_privileges
1292 .into_iter()
1293 .chain(default_privileges)
1294 .chain(progress_source_privilege),
1295 )
1296 .collect();
1297
1298 let temporary_oids = state.get_temporary_oids().collect();
1299
1300 if item.is_temporary() {
1301 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1302 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1303 {
1304 return Err(AdapterError::Catalog(Error::new(
1305 ErrorKind::InvalidTemporarySchema,
1306 )));
1307 }
1308 let oid = tx.allocate_oid(&temporary_oids)?;
1309
1310 let schema_id = name.qualifiers.schema_spec.clone().into();
1311 let item_type = item.typ();
1312 let (create_sql, global_id, versions) = item.to_serialized();
1313
1314 let item = TemporaryItem {
1315 id,
1316 oid,
1317 global_id,
1318 schema_id,
1319 name: name.item.clone(),
1320 create_sql,
1321 conn_id: item.conn_id().cloned(),
1322 owner_id,
1323 privileges: privileges.clone(),
1324 extra_versions: versions,
1325 };
1326 temporary_item_updates.push((item, StateDiff::Addition));
1327
1328 info!(
1329 "create temporary {} {} ({})",
1330 item_type,
1331 state.resolve_full_name(&name, None),
1332 id
1333 );
1334 } else {
1335 if let Some(temp_id) =
1336 item.uses()
1337 .iter()
1338 .find(|id| match state.try_get_entry(*id) {
1339 Some(entry) => entry.item().is_temporary(),
1340 None => temporary_ids.contains(id),
1341 })
1342 {
1343 let temp_item = state.get_entry(temp_id);
1344 return Err(AdapterError::Catalog(Error::new(
1345 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1346 )));
1347 }
1348 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1349 && !system_user
1350 {
1351 let schema_name = state
1352 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1353 .schema;
1354 return Err(AdapterError::Catalog(Error::new(
1355 ErrorKind::ReadOnlySystemSchema(schema_name),
1356 )));
1357 }
1358 let schema_id = name.qualifiers.schema_spec.clone().into();
1359 let item_type = item.typ();
1360 let (create_sql, global_id, versions) = item.to_serialized();
1361 tx.insert_user_item(
1362 id,
1363 global_id,
1364 schema_id,
1365 &name.item,
1366 create_sql,
1367 owner_id,
1368 privileges.clone(),
1369 &temporary_oids,
1370 versions,
1371 )?;
1372 info!(
1373 "create {} {} ({})",
1374 item_type,
1375 state.resolve_full_name(&name, None),
1376 id
1377 );
1378 }
1379
1380 if Self::should_audit_log_item(&item) {
1381 let name = Self::full_name_detail(
1382 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1383 );
1384 let details = match &item {
1385 CatalogItem::Source(s) => {
1386 let cluster_id = match s.data_source {
1387 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1390 match state.get_entry(&ingestion_id).cluster_id() {
1391 Some(cluster_id) => Some(cluster_id.to_string()),
1392 None => None,
1393 }
1394 }
1395 _ => match item.cluster_id() {
1396 Some(cluster_id) => Some(cluster_id.to_string()),
1397 None => None,
1398 },
1399 };
1400
1401 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1402 id: id.to_string(),
1403 cluster_id,
1404 name,
1405 external_type: s.source_type().to_string(),
1406 })
1407 }
1408 CatalogItem::Sink(s) => {
1409 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1410 id: id.to_string(),
1411 cluster_id: Some(s.cluster_id.to_string()),
1412 name,
1413 external_type: s.sink_type().to_string(),
1414 })
1415 }
1416 CatalogItem::Index(i) => {
1417 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1418 id: id.to_string(),
1419 name,
1420 cluster_id: i.cluster_id.to_string(),
1421 })
1422 }
1423 CatalogItem::MaterializedView(mv) => {
1424 EventDetails::CreateMaterializedViewV1(
1425 mz_audit_log::CreateMaterializedViewV1 {
1426 id: id.to_string(),
1427 name,
1428 cluster_id: mv.cluster_id.to_string(),
1429 replacement_target_id: mv
1430 .replacement_target
1431 .map(|id| id.to_string()),
1432 },
1433 )
1434 }
1435 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1436 id: id.to_string(),
1437 name,
1438 }),
1439 };
1440 CatalogState::add_to_audit_log(
1441 &state.system_configuration,
1442 oracle_write_ts,
1443 session,
1444 tx,
1445 audit_events,
1446 EventType::Create,
1447 catalog_type_to_audit_object_type(item.typ()),
1448 details,
1449 )?;
1450 }
1451 }
1452 Op::CreateNetworkPolicy {
1453 rules,
1454 name,
1455 owner_id,
1456 } => {
1457 if state.network_policies_by_name.contains_key(&name) {
1458 return Err(AdapterError::PlanError(PlanError::Catalog(
1459 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1460 )));
1461 }
1462 if is_reserved_name(&name) {
1463 return Err(AdapterError::Catalog(Error::new(
1464 ErrorKind::ReservedNetworkPolicyName(name),
1465 )));
1466 }
1467
1468 let owner_privileges = vec![rbac::owner_privilege(
1469 mz_sql::catalog::ObjectType::NetworkPolicy,
1470 owner_id,
1471 )];
1472 let default_privileges = state
1473 .default_privileges
1474 .get_applicable_privileges(
1475 owner_id,
1476 None,
1477 None,
1478 mz_sql::catalog::ObjectType::NetworkPolicy,
1479 )
1480 .map(|item| item.mz_acl_item(owner_id));
1481 let privileges: Vec<_> =
1482 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1483 .collect();
1484
1485 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1486 let id = tx.insert_user_network_policy(
1487 name.clone(),
1488 rules,
1489 privileges,
1490 owner_id,
1491 &temporary_oids,
1492 )?;
1493
1494 CatalogState::add_to_audit_log(
1495 &state.system_configuration,
1496 oracle_write_ts,
1497 session,
1498 tx,
1499 audit_events,
1500 EventType::Create,
1501 ObjectType::NetworkPolicy,
1502 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1503 id: id.to_string(),
1504 name: name.clone(),
1505 }),
1506 )?;
1507
1508 info!("created network policy {name} ({id})");
1509 }
1510 Op::Comment {
1511 object_id,
1512 sub_component,
1513 comment,
1514 } => {
1515 tx.update_comment(object_id, sub_component, comment)?;
1516 let entry = state.get_comment_id_entry(&object_id);
1517 let should_log = entry
1518 .map(|entry| Self::should_audit_log_item(entry.item()))
1519 .unwrap_or(true);
1521 if let (Some(conn_id), true) =
1524 (session.map(|session| session.conn_id()), should_log)
1525 {
1526 CatalogState::add_to_audit_log(
1527 &state.system_configuration,
1528 oracle_write_ts,
1529 session,
1530 tx,
1531 audit_events,
1532 EventType::Comment,
1533 comment_id_to_audit_object_type(object_id),
1534 EventDetails::IdNameV1(IdNameV1 {
1535 id: format!("{object_id:?}"),
1537 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1538 }),
1539 )?;
1540 }
1541 }
1542 Op::UpdateSourceReferences {
1543 source_id,
1544 references,
1545 } => {
1546 tx.update_source_references(
1547 source_id,
1548 references
1549 .references
1550 .into_iter()
1551 .map(|reference| reference.into())
1552 .collect(),
1553 references.updated_at,
1554 )?;
1555 }
1556 Op::DropObjects(drop_object_infos) => {
1557 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1559
1560 tx.drop_comments(&delta.comments)?;
1562
1563 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1565 delta
1566 .items
1567 .iter()
1568 .map(|id| id)
1569 .partition(|id| !state.get_entry(*id).item().is_temporary());
1570 tx.remove_items(&durable_items_to_drop)?;
1571 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1572 let entry = state.get_entry(&id);
1573 (entry.clone().into(), StateDiff::Retraction)
1574 }));
1575
1576 for item_id in delta.items {
1577 let entry = state.get_entry(&item_id);
1578
1579 if entry.item().is_storage_collection() {
1580 storage_collections_to_drop.extend(entry.global_ids());
1581 }
1582
1583 if state.source_references.contains_key(&item_id) {
1584 tx.remove_source_references(item_id)?;
1585 }
1586
1587 if Self::should_audit_log_item(entry.item()) {
1588 CatalogState::add_to_audit_log(
1589 &state.system_configuration,
1590 oracle_write_ts,
1591 session,
1592 tx,
1593 audit_events,
1594 EventType::Drop,
1595 catalog_type_to_audit_object_type(entry.item().typ()),
1596 EventDetails::IdFullNameV1(IdFullNameV1 {
1597 id: item_id.to_string(),
1598 name: Self::full_name_detail(&state.resolve_full_name(
1599 entry.name(),
1600 session.map(|session| session.conn_id()),
1601 )),
1602 }),
1603 )?;
1604 }
1605 info!(
1606 "drop {} {} ({})",
1607 entry.item_type(),
1608 state.resolve_full_name(entry.name(), entry.conn_id()),
1609 item_id
1610 );
1611 }
1612
1613 let schemas = delta
1615 .schemas
1616 .iter()
1617 .map(|(schema_spec, database_spec)| {
1618 (SchemaId::from(schema_spec), *database_spec)
1619 })
1620 .collect();
1621 tx.remove_schemas(&schemas)?;
1622
1623 for (schema_spec, database_spec) in delta.schemas {
1624 let schema = state.get_schema(
1625 &database_spec,
1626 &schema_spec,
1627 session
1628 .map(|session| session.conn_id())
1629 .unwrap_or(&SYSTEM_CONN_ID),
1630 );
1631
1632 let schema_id = SchemaId::from(schema_spec);
1633 let database_id = match database_spec {
1634 ResolvedDatabaseSpecifier::Ambient => None,
1635 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1636 };
1637
1638 CatalogState::add_to_audit_log(
1639 &state.system_configuration,
1640 oracle_write_ts,
1641 session,
1642 tx,
1643 audit_events,
1644 EventType::Drop,
1645 ObjectType::Schema,
1646 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1647 id: schema_id.to_string(),
1648 name: schema.name.schema.to_string(),
1649 database_name: database_id
1650 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1651 }),
1652 )?;
1653 }
1654
1655 tx.remove_databases(&delta.databases)?;
1657
1658 for database_id in delta.databases {
1659 let database = state.get_database(&database_id).clone();
1660
1661 CatalogState::add_to_audit_log(
1662 &state.system_configuration,
1663 oracle_write_ts,
1664 session,
1665 tx,
1666 audit_events,
1667 EventType::Drop,
1668 ObjectType::Database,
1669 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1670 id: database_id.to_string(),
1671 name: database.name.clone(),
1672 }),
1673 )?;
1674 }
1675
1676 tx.remove_user_roles(&delta.roles)?;
1678
1679 for role_id in delta.roles {
1680 let role = state
1681 .roles_by_id
1682 .get(&role_id)
1683 .expect("catalog out of sync");
1684
1685 CatalogState::add_to_audit_log(
1686 &state.system_configuration,
1687 oracle_write_ts,
1688 session,
1689 tx,
1690 audit_events,
1691 EventType::Drop,
1692 ObjectType::Role,
1693 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1694 id: role.id.to_string(),
1695 name: role.name.clone(),
1696 }),
1697 )?;
1698 info!("drop role {}", role.name());
1699 }
1700
1701 tx.remove_network_policies(&delta.network_policies)?;
1703
1704 for network_policy_id in delta.network_policies {
1705 let policy = state
1706 .network_policies_by_id
1707 .get(&network_policy_id)
1708 .expect("catalog out of sync");
1709
1710 CatalogState::add_to_audit_log(
1711 &state.system_configuration,
1712 oracle_write_ts,
1713 session,
1714 tx,
1715 audit_events,
1716 EventType::Drop,
1717 ObjectType::NetworkPolicy,
1718 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1719 id: policy.id.to_string(),
1720 name: policy.name.clone(),
1721 }),
1722 )?;
1723 info!("drop network policy {}", policy.name.clone());
1724 }
1725
1726 let replicas = delta.replicas.keys().copied().collect();
1728 tx.remove_cluster_replicas(&replicas)?;
1729
1730 for (replica_id, (cluster_id, reason)) in delta.replicas {
1731 let cluster = state.get_cluster(cluster_id);
1732 let replica = cluster.replica(replica_id).expect("Must exist");
1733
1734 let (reason, scheduling_policies) = reason.into_audit_log();
1735 let details =
1736 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1737 cluster_id: cluster_id.to_string(),
1738 cluster_name: cluster.name.clone(),
1739 replica_id: Some(replica_id.to_string()),
1740 replica_name: replica.name.clone(),
1741 reason,
1742 scheduling_policies,
1743 });
1744 CatalogState::add_to_audit_log(
1745 &state.system_configuration,
1746 oracle_write_ts,
1747 session,
1748 tx,
1749 audit_events,
1750 EventType::Drop,
1751 ObjectType::ClusterReplica,
1752 details,
1753 )?;
1754 }
1755
1756 tx.remove_clusters(&delta.clusters)?;
1758
1759 for cluster_id in delta.clusters {
1760 let cluster = state.get_cluster(cluster_id);
1761
1762 CatalogState::add_to_audit_log(
1763 &state.system_configuration,
1764 oracle_write_ts,
1765 session,
1766 tx,
1767 audit_events,
1768 EventType::Drop,
1769 ObjectType::Cluster,
1770 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1771 id: cluster.id.to_string(),
1772 name: cluster.name.clone(),
1773 }),
1774 )?;
1775 }
1776 }
1777 Op::GrantRole {
1778 role_id,
1779 member_id,
1780 grantor_id,
1781 } => {
1782 state.ensure_not_reserved_role(&member_id)?;
1783 state.ensure_grantable_role(&role_id)?;
1784 if state.collect_role_membership(&role_id).contains(&member_id) {
1785 let group_role = state.get_role(&role_id);
1786 let member_role = state.get_role(&member_id);
1787 return Err(AdapterError::Catalog(Error::new(
1788 ErrorKind::CircularRoleMembership {
1789 role_name: group_role.name().to_string(),
1790 member_name: member_role.name().to_string(),
1791 },
1792 )));
1793 }
1794 let mut member_role = state.get_role(&member_id).clone();
1795 member_role.membership.map.insert(role_id, grantor_id);
1796 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1797
1798 CatalogState::add_to_audit_log(
1799 &state.system_configuration,
1800 oracle_write_ts,
1801 session,
1802 tx,
1803 audit_events,
1804 EventType::Grant,
1805 ObjectType::Role,
1806 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1807 role_id: role_id.to_string(),
1808 member_id: member_id.to_string(),
1809 grantor_id: grantor_id.to_string(),
1810 executed_by: session
1811 .map(|session| session.authenticated_role_id())
1812 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1813 .to_string(),
1814 }),
1815 )?;
1816 }
1817 Op::RevokeRole {
1818 role_id,
1819 member_id,
1820 grantor_id,
1821 } => {
1822 state.ensure_not_reserved_role(&member_id)?;
1823 state.ensure_grantable_role(&role_id)?;
1824 let mut member_role = state.get_role(&member_id).clone();
1825 member_role.membership.map.remove(&role_id);
1826 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1827
1828 CatalogState::add_to_audit_log(
1829 &state.system_configuration,
1830 oracle_write_ts,
1831 session,
1832 tx,
1833 audit_events,
1834 EventType::Revoke,
1835 ObjectType::Role,
1836 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1837 role_id: role_id.to_string(),
1838 member_id: member_id.to_string(),
1839 grantor_id: grantor_id.to_string(),
1840 executed_by: session
1841 .map(|session| session.authenticated_role_id())
1842 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1843 .to_string(),
1844 }),
1845 )?;
1846 }
1847 Op::UpdatePrivilege {
1848 target_id,
1849 privilege,
1850 variant,
1851 } => {
1852 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1853 UpdatePrivilegeVariant::Grant => {
1854 privileges.grant(privilege);
1855 }
1856 UpdatePrivilegeVariant::Revoke => {
1857 privileges.revoke(&privilege);
1858 }
1859 };
1860 match &target_id {
1861 SystemObjectId::Object(object_id) => match object_id {
1862 ObjectId::Cluster(id) => {
1863 let mut cluster = state.get_cluster(*id).clone();
1864 update_privilege_fn(&mut cluster.privileges);
1865 tx.update_cluster(*id, cluster.into())?;
1866 }
1867 ObjectId::Database(id) => {
1868 let mut database = state.get_database(id).clone();
1869 update_privilege_fn(&mut database.privileges);
1870 tx.update_database(*id, database.into())?;
1871 }
1872 ObjectId::NetworkPolicy(id) => {
1873 let mut policy = state.get_network_policy(id).clone();
1874 update_privilege_fn(&mut policy.privileges);
1875 tx.update_network_policy(*id, policy.into())?;
1876 }
1877 ObjectId::Schema((database_spec, schema_spec)) => {
1878 let schema_id = schema_spec.clone().into();
1879 let mut schema = state
1880 .get_schema(
1881 database_spec,
1882 schema_spec,
1883 session
1884 .map(|session| session.conn_id())
1885 .unwrap_or(&SYSTEM_CONN_ID),
1886 )
1887 .clone();
1888 update_privilege_fn(&mut schema.privileges);
1889 tx.update_schema(schema_id, schema.into())?;
1890 }
1891 ObjectId::Item(id) => {
1892 let entry = state.get_entry(id);
1893 let mut new_entry = entry.clone();
1894 update_privilege_fn(&mut new_entry.privileges);
1895 if !new_entry.item().is_temporary() {
1896 tx.update_item(*id, new_entry.into())?;
1897 } else {
1898 temporary_item_updates
1899 .push((entry.clone().into(), StateDiff::Retraction));
1900 temporary_item_updates
1901 .push((new_entry.into(), StateDiff::Addition));
1902 }
1903 }
1904 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1905 },
1906 SystemObjectId::System => {
1907 let mut system_privileges = state.system_privileges.clone();
1908 update_privilege_fn(&mut system_privileges);
1909 let new_privilege =
1910 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1911 tx.set_system_privilege(
1912 privilege.grantee,
1913 privilege.grantor,
1914 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1915 )?;
1916 }
1917 }
1918 let object_type = state.get_system_object_type(&target_id);
1919 let object_id_str = match &target_id {
1920 SystemObjectId::System => "SYSTEM".to_string(),
1921 SystemObjectId::Object(id) => id.to_string(),
1922 };
1923 CatalogState::add_to_audit_log(
1924 &state.system_configuration,
1925 oracle_write_ts,
1926 session,
1927 tx,
1928 audit_events,
1929 variant.into(),
1930 system_object_type_to_audit_object_type(&object_type),
1931 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1932 object_id: object_id_str,
1933 grantee_id: privilege.grantee.to_string(),
1934 grantor_id: privilege.grantor.to_string(),
1935 privileges: privilege.acl_mode.to_string(),
1936 }),
1937 )?;
1938 }
1939 Op::UpdateDefaultPrivilege {
1940 privilege_object,
1941 privilege_acl_item,
1942 variant,
1943 } => {
1944 let mut default_privileges = state.default_privileges.clone();
1945 match variant {
1946 UpdatePrivilegeVariant::Grant => default_privileges
1947 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1948 UpdatePrivilegeVariant::Revoke => {
1949 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1950 }
1951 }
1952 let new_acl_mode = default_privileges
1953 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1954 tx.set_default_privilege(
1955 privilege_object.role_id,
1956 privilege_object.database_id,
1957 privilege_object.schema_id,
1958 privilege_object.object_type,
1959 privilege_acl_item.grantee,
1960 new_acl_mode.cloned(),
1961 )?;
1962 CatalogState::add_to_audit_log(
1963 &state.system_configuration,
1964 oracle_write_ts,
1965 session,
1966 tx,
1967 audit_events,
1968 variant.into(),
1969 object_type_to_audit_object_type(privilege_object.object_type),
1970 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1971 role_id: privilege_object.role_id.to_string(),
1972 database_id: privilege_object.database_id.map(|id| id.to_string()),
1973 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1974 grantee_id: privilege_acl_item.grantee.to_string(),
1975 privileges: privilege_acl_item.acl_mode.to_string(),
1976 }),
1977 )?;
1978 }
1979 Op::RenameCluster {
1980 id,
1981 name,
1982 to_name,
1983 check_reserved_names,
1984 } => {
1985 if id.is_system() {
1986 return Err(AdapterError::Catalog(Error::new(
1987 ErrorKind::ReadOnlyCluster(name.clone()),
1988 )));
1989 }
1990 if check_reserved_names && is_reserved_name(&to_name) {
1991 return Err(AdapterError::Catalog(Error::new(
1992 ErrorKind::ReservedClusterName(to_name),
1993 )));
1994 }
1995 tx.rename_cluster(id, &name, &to_name)?;
1996 CatalogState::add_to_audit_log(
1997 &state.system_configuration,
1998 oracle_write_ts,
1999 session,
2000 tx,
2001 audit_events,
2002 EventType::Alter,
2003 ObjectType::Cluster,
2004 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2005 id: id.to_string(),
2006 old_name: name.clone(),
2007 new_name: to_name.clone(),
2008 }),
2009 )?;
2010 info!("rename cluster {name} to {to_name}");
2011 }
2012 Op::RenameClusterReplica {
2013 cluster_id,
2014 replica_id,
2015 name,
2016 to_name,
2017 } => {
2018 if is_reserved_name(&to_name) {
2019 return Err(AdapterError::Catalog(Error::new(
2020 ErrorKind::ReservedReplicaName(to_name),
2021 )));
2022 }
2023 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2024 CatalogState::add_to_audit_log(
2025 &state.system_configuration,
2026 oracle_write_ts,
2027 session,
2028 tx,
2029 audit_events,
2030 EventType::Alter,
2031 ObjectType::ClusterReplica,
2032 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2033 cluster_id: cluster_id.to_string(),
2034 replica_id: replica_id.to_string(),
2035 old_name: name.replica.as_str().to_string(),
2036 new_name: to_name.clone(),
2037 }),
2038 )?;
2039 info!("rename cluster replica {name} to {to_name}");
2040 }
2041 Op::RenameItem {
2042 id,
2043 to_name,
2044 current_full_name,
2045 } => {
2046 let mut updates = Vec::new();
2047
2048 let entry = state.get_entry(&id);
2049 if let CatalogItem::Type(_) = entry.item() {
2050 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2051 current_full_name.to_string(),
2052 ))));
2053 }
2054
2055 if entry.id().is_system() {
2056 let name = state
2057 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2058 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2059 name.to_string(),
2060 ))));
2061 }
2062
2063 let mut to_full_name = current_full_name.clone();
2064 to_full_name.item.clone_from(&to_name);
2065
2066 let mut to_qualified_name = entry.name().clone();
2067 to_qualified_name.item.clone_from(&to_name);
2068
2069 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2070 id: id.to_string(),
2071 old_name: Self::full_name_detail(¤t_full_name),
2072 new_name: Self::full_name_detail(&to_full_name),
2073 });
2074 if Self::should_audit_log_item(entry.item()) {
2075 CatalogState::add_to_audit_log(
2076 &state.system_configuration,
2077 oracle_write_ts,
2078 session,
2079 tx,
2080 audit_events,
2081 EventType::Alter,
2082 catalog_type_to_audit_object_type(entry.item().typ()),
2083 details,
2084 )?;
2085 }
2086
2087 let mut new_entry = entry.clone();
2089 new_entry.name.item.clone_from(&to_name);
2090 new_entry.item = entry
2091 .item()
2092 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2093 .map_err(|e| {
2094 Error::new(ErrorKind::from(AmbiguousRename {
2095 depender: state
2096 .resolve_full_name(entry.name(), entry.conn_id())
2097 .to_string(),
2098 dependee: state
2099 .resolve_full_name(entry.name(), entry.conn_id())
2100 .to_string(),
2101 message: e,
2102 }))
2103 })?;
2104
2105 for id in entry.referenced_by() {
2106 let dependent_item = state.get_entry(id);
2107 let mut to_entry = dependent_item.clone();
2108 to_entry.item = dependent_item
2109 .item()
2110 .rename_item_refs(
2111 current_full_name.clone(),
2112 to_full_name.item.clone(),
2113 false,
2114 )
2115 .map_err(|e| {
2116 Error::new(ErrorKind::from(AmbiguousRename {
2117 depender: state
2118 .resolve_full_name(
2119 dependent_item.name(),
2120 dependent_item.conn_id(),
2121 )
2122 .to_string(),
2123 dependee: state
2124 .resolve_full_name(entry.name(), entry.conn_id())
2125 .to_string(),
2126 message: e,
2127 }))
2128 })?;
2129
2130 if !to_entry.item().is_temporary() {
2131 tx.update_item(*id, to_entry.into())?;
2132 } else {
2133 temporary_item_updates
2134 .push((dependent_item.clone().into(), StateDiff::Retraction));
2135 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2136 }
2137 updates.push(*id);
2138 }
2139 if !new_entry.item().is_temporary() {
2140 tx.update_item(id, new_entry.into())?;
2141 } else {
2142 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2143 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2144 }
2145
2146 updates.push(id);
2147 for id in updates {
2148 Self::log_update(state, &id);
2149 }
2150 }
2151 Op::RenameSchema {
2152 database_spec,
2153 schema_spec,
2154 new_name,
2155 check_reserved_names,
2156 } => {
2157 if check_reserved_names && is_reserved_name(&new_name) {
2158 return Err(AdapterError::Catalog(Error::new(
2159 ErrorKind::ReservedSchemaName(new_name),
2160 )));
2161 }
2162
2163 let conn_id = session
2164 .map(|session| session.conn_id())
2165 .unwrap_or(&SYSTEM_CONN_ID);
2166
2167 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2168 let cur_name = schema.name().schema.clone();
2169
2170 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2171 return Err(AdapterError::Catalog(Error::new(
2172 ErrorKind::AmbientSchemaRename(cur_name),
2173 )));
2174 };
2175 let database = state.get_database(&database_id);
2176 let database_name = &database.name;
2177
2178 let mut updates = Vec::new();
2179 let mut items_to_update = BTreeMap::new();
2180
2181 let mut update_item = |id| {
2182 if items_to_update.contains_key(id) {
2183 return Ok(());
2184 }
2185
2186 let entry = state.get_entry(id);
2187
2188 let mut new_entry = entry.clone();
2190 new_entry.item = entry
2191 .item
2192 .rename_schema_refs(database_name, &cur_name, &new_name)
2193 .map_err(|(s, _i)| {
2194 Error::new(ErrorKind::from(AmbiguousRename {
2195 depender: state
2196 .resolve_full_name(entry.name(), entry.conn_id())
2197 .to_string(),
2198 dependee: format!("{database_name}.{cur_name}"),
2199 message: format!("ambiguous reference to schema named {s}"),
2200 }))
2201 })?;
2202
2203 if !new_entry.item().is_temporary() {
2205 items_to_update.insert(*id, new_entry.into());
2206 } else {
2207 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2208 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2209 }
2210 updates.push(id);
2211
2212 Ok::<_, AdapterError>(())
2213 };
2214
2215 for (_name, item_id) in &schema.items {
2217 update_item(item_id)?;
2219
2220 for id in state.get_entry(item_id).referenced_by() {
2222 update_item(id)?;
2223 }
2224 }
2225 tx.update_items(items_to_update)?;
2228
2229 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2231 let schema_name = schema.name().schema.clone();
2232 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2233 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2234 )));
2235 };
2236
2237 let database_name = database_spec
2239 .id()
2240 .map(|id| state.get_database(&id).name.clone());
2241 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2242 id: schema_id.to_string(),
2243 old_name: schema.name().schema.clone(),
2244 new_name: new_name.clone(),
2245 database_name,
2246 });
2247 CatalogState::add_to_audit_log(
2248 &state.system_configuration,
2249 oracle_write_ts,
2250 session,
2251 tx,
2252 audit_events,
2253 EventType::Alter,
2254 mz_audit_log::ObjectType::Schema,
2255 details,
2256 )?;
2257
2258 let mut new_schema = schema.clone();
2260 new_schema.name.schema.clone_from(&new_name);
2261 tx.update_schema(schema_id, new_schema.into())?;
2262
2263 for id in updates {
2264 Self::log_update(state, id);
2265 }
2266 }
2267 Op::UpdateOwner { id, new_owner } => {
2268 let conn_id = session
2269 .map(|session| session.conn_id())
2270 .unwrap_or(&SYSTEM_CONN_ID);
2271 let old_owner = state
2272 .get_owner_id(&id, conn_id)
2273 .expect("cannot update the owner of an object without an owner");
2274 match &id {
2275 ObjectId::Cluster(id) => {
2276 let mut cluster = state.get_cluster(*id).clone();
2277 if id.is_system() {
2278 return Err(AdapterError::Catalog(Error::new(
2279 ErrorKind::ReadOnlyCluster(cluster.name),
2280 )));
2281 }
2282 Self::update_privilege_owners(
2283 &mut cluster.privileges,
2284 cluster.owner_id,
2285 new_owner,
2286 );
2287 cluster.owner_id = new_owner;
2288 tx.update_cluster(*id, cluster.into())?;
2289 }
2290 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2291 let cluster = state.get_cluster(*cluster_id);
2292 let mut replica = cluster
2293 .replica(*replica_id)
2294 .expect("catalog out of sync")
2295 .clone();
2296 if replica_id.is_system() {
2297 return Err(AdapterError::Catalog(Error::new(
2298 ErrorKind::ReadOnlyClusterReplica(replica.name),
2299 )));
2300 }
2301 replica.owner_id = new_owner;
2302 tx.update_cluster_replica(*replica_id, replica.into())?;
2303 }
2304 ObjectId::Database(id) => {
2305 let mut database = state.get_database(id).clone();
2306 if id.is_system() {
2307 return Err(AdapterError::Catalog(Error::new(
2308 ErrorKind::ReadOnlyDatabase(database.name),
2309 )));
2310 }
2311 Self::update_privilege_owners(
2312 &mut database.privileges,
2313 database.owner_id,
2314 new_owner,
2315 );
2316 database.owner_id = new_owner;
2317 tx.update_database(*id, database.clone().into())?;
2318 }
2319 ObjectId::Schema((database_spec, schema_spec)) => {
2320 let schema_id: SchemaId = schema_spec.clone().into();
2321 let mut schema = state
2322 .get_schema(database_spec, schema_spec, conn_id)
2323 .clone();
2324 if schema_id.is_system() {
2325 let name = schema.name();
2326 let full_name = state.resolve_full_schema_name(name);
2327 return Err(AdapterError::Catalog(Error::new(
2328 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2329 )));
2330 }
2331 Self::update_privilege_owners(
2332 &mut schema.privileges,
2333 schema.owner_id,
2334 new_owner,
2335 );
2336 schema.owner_id = new_owner;
2337 tx.update_schema(schema_id, schema.into())?;
2338 }
2339 ObjectId::Item(id) => {
2340 let entry = state.get_entry(id);
2341 let mut new_entry = entry.clone();
2342 if id.is_system() {
2343 let full_name = state.resolve_full_name(
2344 new_entry.name(),
2345 session.map(|session| session.conn_id()),
2346 );
2347 return Err(AdapterError::Catalog(Error::new(
2348 ErrorKind::ReadOnlyItem(full_name.to_string()),
2349 )));
2350 }
2351 Self::update_privilege_owners(
2352 &mut new_entry.privileges,
2353 new_entry.owner_id,
2354 new_owner,
2355 );
2356 new_entry.owner_id = new_owner;
2357 if !new_entry.item().is_temporary() {
2358 tx.update_item(*id, new_entry.into())?;
2359 } else {
2360 temporary_item_updates
2361 .push((entry.clone().into(), StateDiff::Retraction));
2362 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2363 }
2364 }
2365 ObjectId::NetworkPolicy(id) => {
2366 let mut policy = state.get_network_policy(id).clone();
2367 if id.is_system() {
2368 return Err(AdapterError::Catalog(Error::new(
2369 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2370 )));
2371 }
2372 Self::update_privilege_owners(
2373 &mut policy.privileges,
2374 policy.owner_id,
2375 new_owner,
2376 );
2377 policy.owner_id = new_owner;
2378 tx.update_network_policy(*id, policy.into())?;
2379 }
2380 ObjectId::Role(_) => unreachable!("roles have no owner"),
2381 }
2382 let object_type = state.get_object_type(&id);
2383 CatalogState::add_to_audit_log(
2384 &state.system_configuration,
2385 oracle_write_ts,
2386 session,
2387 tx,
2388 audit_events,
2389 EventType::Alter,
2390 object_type_to_audit_object_type(object_type),
2391 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2392 object_id: id.to_string(),
2393 old_owner_id: old_owner.to_string(),
2394 new_owner_id: new_owner.to_string(),
2395 }),
2396 )?;
2397 }
2398 Op::UpdateClusterConfig { id, name, config } => {
2399 let mut cluster = state.get_cluster(id).clone();
2400 cluster.config = config;
2401 tx.update_cluster(id, cluster.into())?;
2402 info!("update cluster {}", name);
2403
2404 CatalogState::add_to_audit_log(
2405 &state.system_configuration,
2406 oracle_write_ts,
2407 session,
2408 tx,
2409 audit_events,
2410 EventType::Alter,
2411 ObjectType::Cluster,
2412 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2413 id: id.to_string(),
2414 name,
2415 }),
2416 )?;
2417 }
2418 Op::UpdateClusterReplicaConfig {
2419 replica_id,
2420 cluster_id,
2421 config,
2422 } => {
2423 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2424 info!("update replica {}", replica.name);
2425 tx.update_cluster_replica(
2426 replica_id,
2427 mz_catalog::durable::ClusterReplica {
2428 cluster_id,
2429 replica_id,
2430 name: replica.name.clone(),
2431 config: config.clone().into(),
2432 owner_id: replica.owner_id,
2433 },
2434 )?;
2435 }
2436 Op::UpdateItem { id, name, to_item } => {
2437 let mut entry = state.get_entry(&id).clone();
2438 entry.name = name.clone();
2439 entry.item = to_item.clone();
2440 tx.update_item(id, entry.into())?;
2441
2442 if Self::should_audit_log_item(&to_item) {
2443 let mut full_name = Self::full_name_detail(
2444 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2445 );
2446 full_name.item = name.item;
2447
2448 CatalogState::add_to_audit_log(
2449 &state.system_configuration,
2450 oracle_write_ts,
2451 session,
2452 tx,
2453 audit_events,
2454 EventType::Alter,
2455 catalog_type_to_audit_object_type(to_item.typ()),
2456 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2457 id: id.to_string(),
2458 name: full_name,
2459 }),
2460 )?;
2461 }
2462
2463 Self::log_update(state, &id);
2464 }
2465 Op::UpdateSystemConfiguration { name, value } => {
2466 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2467 tx.upsert_system_config(&name, parsed_value.clone())?;
2468 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2473 let with_0dt_deployment_max_wait =
2474 Duration::parse(VarInput::Flat(&parsed_value))
2475 .expect("parsing succeeded above");
2476 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2477 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2478 let with_0dt_deployment_ddl_check_interval =
2479 Duration::parse(VarInput::Flat(&parsed_value))
2480 .expect("parsing succeeded above");
2481 tx.set_0dt_deployment_ddl_check_interval(
2482 with_0dt_deployment_ddl_check_interval,
2483 )?;
2484 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2485 let panic_after_timeout =
2486 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2487 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2488 }
2489
2490 CatalogState::add_to_audit_log(
2491 &state.system_configuration,
2492 oracle_write_ts,
2493 session,
2494 tx,
2495 audit_events,
2496 EventType::Alter,
2497 ObjectType::System,
2498 EventDetails::SetV1(mz_audit_log::SetV1 {
2499 name,
2500 value: Some(value.borrow().to_vec().join(", ")),
2501 }),
2502 )?;
2503 }
2504 Op::ResetSystemConfiguration { name } => {
2505 tx.remove_system_config(&name);
2506 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2511 tx.reset_0dt_deployment_max_wait()?;
2512 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2513 tx.reset_0dt_deployment_ddl_check_interval()?;
2514 }
2515
2516 CatalogState::add_to_audit_log(
2517 &state.system_configuration,
2518 oracle_write_ts,
2519 session,
2520 tx,
2521 audit_events,
2522 EventType::Alter,
2523 ObjectType::System,
2524 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2525 )?;
2526 }
2527 Op::ResetAllSystemConfiguration => {
2528 tx.clear_system_configs();
2529 tx.reset_0dt_deployment_max_wait()?;
2530 tx.reset_0dt_deployment_ddl_check_interval()?;
2531
2532 CatalogState::add_to_audit_log(
2533 &state.system_configuration,
2534 oracle_write_ts,
2535 session,
2536 tx,
2537 audit_events,
2538 EventType::Alter,
2539 ObjectType::System,
2540 EventDetails::ResetAllV1,
2541 )?;
2542 }
2543 Op::WeirdStorageUsageUpdates {
2544 object_id,
2545 size_bytes,
2546 collection_timestamp,
2547 } => {
2548 let id = tx.allocate_storage_usage_ids()?;
2549 let metric =
2550 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2551 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2552 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2553 weird_builtin_table_update = Some(builtin_table_update);
2554 }
2555 };
2556 Ok((weird_builtin_table_update, temporary_item_updates))
2557 }
2558
2559 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2560 let entry = state.get_entry(id);
2561 info!(
2562 "update {} {} ({})",
2563 entry.item_type(),
2564 state.resolve_full_name(entry.name(), entry.conn_id()),
2565 id
2566 );
2567 }
2568
2569 fn update_privilege_owners(
2573 privileges: &mut PrivilegeMap,
2574 old_owner: RoleId,
2575 new_owner: RoleId,
2576 ) {
2577 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2579
2580 let mut new_present = false;
2581 for privilege in flat_privileges.iter_mut() {
2582 if privilege.grantor == old_owner {
2585 privilege.grantor = new_owner;
2586 } else if privilege.grantor == new_owner {
2587 new_present = true;
2588 }
2589 if privilege.grantee == old_owner {
2591 privilege.grantee = new_owner;
2592 } else if privilege.grantee == new_owner {
2593 new_present = true;
2594 }
2595 }
2596
2597 if new_present {
2601 let privilege_map: BTreeMap<_, Vec<_>> =
2603 flat_privileges
2604 .into_iter()
2605 .fold(BTreeMap::new(), |mut accum, privilege| {
2606 accum
2607 .entry((privilege.grantee, privilege.grantor))
2608 .or_default()
2609 .push(privilege);
2610 accum
2611 });
2612
2613 flat_privileges = privilege_map
2615 .into_iter()
2616 .map(|((grantee, grantor), values)|
2617 values.into_iter().fold(
2619 MzAclItem::empty(grantee, grantor),
2620 |mut accum, mz_aclitem| {
2621 accum.acl_mode =
2622 accum.acl_mode.union(mz_aclitem.acl_mode);
2623 accum
2624 },
2625 ))
2626 .collect();
2627 }
2628
2629 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2630 }
2631}
2632
2633fn tx_replace_item(
2642 tx: &mut Transaction<'_>,
2643 state: &CatalogState,
2644 id: CatalogItemId,
2645 new_entry: CatalogEntry,
2646) -> Result<(), AdapterError> {
2647 let new_id = new_entry.id;
2648
2649 for use_id in new_entry.referenced_by() {
2651 if tx.get_item(use_id).is_none() {
2653 continue;
2654 }
2655
2656 let mut dependent = state.get_entry(use_id).clone();
2657 dependent.item = dependent.item.replace_item_refs(id, new_id);
2658 tx.update_item(*use_id, dependent.into())?;
2659 }
2660
2661 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2663 let new_comment_id = new_entry.comment_object_id();
2664 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2665 tx.drop_comments(&[old_comment_id].into())?;
2666 for (sub, comment) in comments {
2667 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2668 }
2669 }
2670
2671 let mz_catalog::durable::Item {
2672 id: _,
2673 oid,
2674 global_id,
2675 schema_id,
2676 name,
2677 create_sql,
2678 owner_id,
2679 privileges,
2680 extra_versions,
2681 } = new_entry.into();
2682
2683 tx.remove_item(id)?;
2684 tx.insert_item(
2685 new_id,
2686 oid,
2687 global_id,
2688 schema_id,
2689 &name,
2690 create_sql,
2691 owner_id,
2692 privileges,
2693 extra_versions,
2694 )?;
2695
2696 Ok(())
2697}
2698
2699fn apply_replacement_audit_events(
2701 state: &CatalogState,
2702 target: &CatalogEntry,
2703 replacement: &CatalogEntry,
2704) -> Vec<(EventType, EventDetails)> {
2705 let mut events = Vec::new();
2706
2707 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2708 let target_id_name = IdFullNameV1 {
2709 id: target.id().to_string(),
2710 name: Catalog::full_name_detail(&target_name),
2711 };
2712 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2713 let replacement_id_name = IdFullNameV1 {
2714 id: replacement.id().to_string(),
2715 name: Catalog::full_name_detail(&replacement_name),
2716 };
2717
2718 if Catalog::should_audit_log_item(&replacement.item) {
2719 events.push((
2720 EventType::Drop,
2721 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2722 ));
2723 }
2724
2725 if Catalog::should_audit_log_item(&target.item) {
2726 events.push((
2727 EventType::Alter,
2728 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2729 target: target_id_name.clone(),
2730 replacement: replacement_id_name,
2731 }),
2732 ));
2733
2734 if let Some(old_cluster_id) = target.cluster_id()
2735 && let Some(new_cluster_id) = replacement.cluster_id()
2736 && old_cluster_id != new_cluster_id
2737 {
2738 events.push((
2741 EventType::Alter,
2742 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2743 id: replacement.id().to_string(),
2744 name: target_id_name.name,
2745 old_cluster_id: old_cluster_id.to_string(),
2746 new_cluster_id: new_cluster_id.to_string(),
2747 }),
2748 ));
2749 }
2750 }
2751
2752 events
2753}
2754
2755#[derive(Debug, Default)]
2763pub(crate) struct ObjectsToDrop {
2764 pub comments: BTreeSet<CommentObjectId>,
2765 pub databases: BTreeSet<DatabaseId>,
2766 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2767 pub clusters: BTreeSet<ClusterId>,
2768 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2769 pub roles: BTreeSet<RoleId>,
2770 pub items: Vec<CatalogItemId>,
2771 pub network_policies: BTreeSet<NetworkPolicyId>,
2772}
2773
2774impl ObjectsToDrop {
2775 pub fn generate(
2776 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2777 state: &CatalogState,
2778 session: Option<&ConnMeta>,
2779 ) -> Result<Self, AdapterError> {
2780 let mut delta = ObjectsToDrop::default();
2781
2782 for drop_object_info in drop_object_infos {
2783 delta.add_item(drop_object_info, state, session)?;
2784 }
2785
2786 Ok(delta)
2787 }
2788
2789 fn add_item(
2790 &mut self,
2791 drop_object_info: DropObjectInfo,
2792 state: &CatalogState,
2793 session: Option<&ConnMeta>,
2794 ) -> Result<(), AdapterError> {
2795 self.comments
2796 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2797
2798 match drop_object_info {
2799 DropObjectInfo::Database(database_id) => {
2800 let database = &state.database_by_id[&database_id];
2801 if database_id.is_system() {
2802 return Err(AdapterError::Catalog(Error::new(
2803 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2804 )));
2805 }
2806
2807 self.databases.insert(database_id);
2808 }
2809 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2810 let schema = state.get_schema(
2811 &database_spec,
2812 &schema_spec,
2813 session
2814 .map(|session| session.conn_id())
2815 .unwrap_or(&SYSTEM_CONN_ID),
2816 );
2817 let schema_id: SchemaId = schema_spec.into();
2818 if schema_id.is_system() {
2819 let name = schema.name();
2820 let full_name = state.resolve_full_schema_name(name);
2821 return Err(AdapterError::Catalog(Error::new(
2822 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2823 )));
2824 }
2825
2826 self.schemas.insert(schema_spec, database_spec);
2827 }
2828 DropObjectInfo::Role(role_id) => {
2829 let name = state.get_role(&role_id).name().to_string();
2830 if role_id.is_system() || role_id.is_predefined() {
2831 return Err(AdapterError::Catalog(Error::new(
2832 ErrorKind::ReservedRoleName(name.clone()),
2833 )));
2834 }
2835 state.ensure_not_reserved_role(&role_id)?;
2836
2837 self.roles.insert(role_id);
2838 }
2839 DropObjectInfo::Cluster(cluster_id) => {
2840 let cluster = state.get_cluster(cluster_id);
2841 let name = &cluster.name;
2842 if cluster_id.is_system() {
2843 return Err(AdapterError::Catalog(Error::new(
2844 ErrorKind::ReadOnlyCluster(name.clone()),
2845 )));
2846 }
2847
2848 self.clusters.insert(cluster_id);
2849 }
2850 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2851 let cluster = state.get_cluster(cluster_id);
2852 let replica = cluster.replica(replica_id).expect("Must exist");
2853
2854 self.replicas
2855 .insert(replica.replica_id, (cluster.id, reason));
2856 }
2857 DropObjectInfo::Item(item_id) => {
2858 let entry = state.get_entry(&item_id);
2859 if item_id.is_system() {
2860 let name = entry.name();
2861 let full_name =
2862 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2863 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2864 full_name.to_string(),
2865 ))));
2866 }
2867
2868 self.items.push(item_id);
2869 }
2870 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2871 let policy = state.get_network_policy(&network_policy_id);
2872 let name = &policy.name;
2873 if network_policy_id.is_system() {
2874 return Err(AdapterError::Catalog(Error::new(
2875 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2876 )));
2877 }
2878
2879 self.network_policies.insert(network_policy_id);
2880 }
2881 }
2882
2883 Ok(())
2884 }
2885}
2886
2887#[cfg(test)]
2888mod tests {
2889 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2890 use mz_repr::role_id::RoleId;
2891
2892 use crate::catalog::Catalog;
2893
2894 #[mz_ore::test]
2895 fn test_update_privilege_owners() {
2896 let old_owner = RoleId::User(1);
2897 let new_owner = RoleId::User(2);
2898 let other_role = RoleId::User(3);
2899
2900 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2902 MzAclItem {
2903 grantee: other_role,
2904 grantor: old_owner,
2905 acl_mode: AclMode::UPDATE,
2906 },
2907 MzAclItem {
2908 grantee: other_role,
2909 grantor: new_owner,
2910 acl_mode: AclMode::SELECT,
2911 },
2912 ]);
2913 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2914 assert_eq!(1, privileges.all_values().count());
2915 assert_eq!(
2916 vec![MzAclItem {
2917 grantee: other_role,
2918 grantor: new_owner,
2919 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2920 }],
2921 privileges.all_values_owned().collect::<Vec<_>>()
2922 );
2923
2924 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2926 MzAclItem {
2927 grantee: old_owner,
2928 grantor: other_role,
2929 acl_mode: AclMode::UPDATE,
2930 },
2931 MzAclItem {
2932 grantee: new_owner,
2933 grantor: other_role,
2934 acl_mode: AclMode::SELECT,
2935 },
2936 ]);
2937 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2938 assert_eq!(1, privileges.all_values().count());
2939 assert_eq!(
2940 vec![MzAclItem {
2941 grantee: new_owner,
2942 grantor: other_role,
2943 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2944 }],
2945 privileges.all_values_owned().collect::<Vec<_>>()
2946 );
2947
2948 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2950 MzAclItem {
2951 grantee: old_owner,
2952 grantor: old_owner,
2953 acl_mode: AclMode::UPDATE,
2954 },
2955 MzAclItem {
2956 grantee: new_owner,
2957 grantor: new_owner,
2958 acl_mode: AclMode::SELECT,
2959 },
2960 ]);
2961 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2962 assert_eq!(1, privileges.all_values().count());
2963 assert_eq!(
2964 vec![MzAclItem {
2965 grantee: new_owner,
2966 grantor: new_owner,
2967 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2968 }],
2969 privileges.all_values_owned().collect::<Vec<_>>()
2970 );
2971 }
2972}