1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::optimize::OptimizerFeatures;
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, ColumnName, GlobalId, SqlColumnType, strconv};
47use mz_sql::ast::RawDataType;
48use mz_sql::catalog::{
49 AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
50 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
51 DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
52 RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use serde::{Deserialize, Serialize};
66use tracing::{info, trace};
67
68use crate::AdapterError;
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
72 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
73 is_reserved_role_name, object_type_to_audit_object_type,
74 system_object_type_to_audit_object_type,
75};
76use crate::config::ScopedParameters;
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88 pub event_type: EventType,
89 pub object_type: ObjectType,
90 pub details: EventDetails,
91 pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96 AlterRetainHistory {
97 id: CatalogItemId,
98 value: Option<Value>,
99 window: CompactionWindow,
100 },
101 AlterSourceTimestampInterval {
102 id: CatalogItemId,
103 value: Option<Value>,
104 interval: Duration,
105 },
106 AlterRole {
107 id: RoleId,
108 name: String,
109 attributes: RoleAttributesRaw,
110 nopassword: bool,
111 vars: RoleVars,
112 },
113 AlterNetworkPolicy {
114 id: NetworkPolicyId,
115 rules: Vec<NetworkPolicyRule>,
116 name: String,
117 owner_id: RoleId,
118 },
119 AlterAddColumn {
120 id: CatalogItemId,
121 new_global_id: GlobalId,
122 name: ColumnName,
123 typ: SqlColumnType,
124 sql: RawDataType,
125 },
126 AlterMaterializedViewApplyReplacement {
127 id: CatalogItemId,
128 replacement_id: CatalogItemId,
129 },
130 CreateDatabase {
131 name: String,
132 owner_id: RoleId,
133 },
134 CreateSchema {
135 database_id: ResolvedDatabaseSpecifier,
136 schema_name: String,
137 owner_id: RoleId,
138 },
139 CreateRole {
140 name: String,
141 attributes: RoleAttributesRaw,
142 },
143 CreateCluster {
144 id: ClusterId,
145 name: String,
146 introspection_sources: Vec<&'static BuiltinLog>,
147 owner_id: RoleId,
148 config: ClusterConfig,
149 },
150 CreateClusterReplica {
151 cluster_id: ClusterId,
152 name: String,
153 config: ReplicaConfig,
154 owner_id: RoleId,
155 reason: ReplicaCreateDropReason,
156 },
157 CreateItem {
158 id: CatalogItemId,
159 name: QualifiedItemName,
160 item: CatalogItem,
161 owner_id: RoleId,
162 },
163 CreateNetworkPolicy {
164 rules: Vec<NetworkPolicyRule>,
165 name: String,
166 owner_id: RoleId,
167 },
168 Comment {
169 object_id: CommentObjectId,
170 sub_component: Option<usize>,
171 comment: Option<String>,
172 },
173 DropObjects(Vec<DropObjectInfo>),
174 GrantRole {
175 role_id: RoleId,
176 member_id: RoleId,
177 grantor_id: RoleId,
178 },
179 RenameCluster {
180 id: ClusterId,
181 name: String,
182 to_name: String,
183 check_reserved_names: bool,
184 },
185 RenameClusterReplica {
186 cluster_id: ClusterId,
187 replica_id: ReplicaId,
188 name: QualifiedReplica,
189 to_name: String,
190 },
191 RenameItem {
192 id: CatalogItemId,
193 current_full_name: FullItemName,
194 to_name: String,
195 },
196 RenameSchema {
197 database_spec: ResolvedDatabaseSpecifier,
198 schema_spec: SchemaSpecifier,
199 new_name: String,
200 check_reserved_names: bool,
201 },
202 UpdateOwner {
203 id: ObjectId,
204 new_owner: RoleId,
205 },
206 UpdatePrivilege {
207 target_id: SystemObjectId,
208 privilege: MzAclItem,
209 variant: UpdatePrivilegeVariant,
210 },
211 UpdateDefaultPrivilege {
212 privilege_object: DefaultPrivilegeObject,
213 privilege_acl_item: DefaultPrivilegeAclItem,
214 variant: UpdatePrivilegeVariant,
215 },
216 RevokeRole {
217 role_id: RoleId,
218 member_id: RoleId,
219 grantor_id: RoleId,
220 },
221 UpdateClusterConfig {
222 id: ClusterId,
223 name: String,
224 config: ClusterConfig,
225 },
226 UpdateClusterReplicaConfig {
227 cluster_id: ClusterId,
228 replica_id: ReplicaId,
229 config: ReplicaConfig,
230 },
231 UpdateItem {
232 id: CatalogItemId,
233 name: QualifiedItemName,
234 to_item: CatalogItem,
235 },
236 UpdateSourceReferences {
237 source_id: CatalogItemId,
238 references: SourceReferences,
239 },
240 UpdateSystemConfiguration {
241 name: String,
242 value: OwnedVarInput,
243 },
244 ResetSystemConfiguration {
245 name: String,
246 },
247 ResetAllSystemConfiguration,
248 UpdateScopedSystemParameters {
256 scoped: ScopedParameters,
257 },
258 InjectAuditEvents {
264 events: Vec<InjectedAuditEvent>,
265 },
266}
267
268#[derive(Debug, Clone)]
272pub enum DropObjectInfo {
273 Cluster(ClusterId),
274 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
275 Database(DatabaseId),
276 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
277 Role(RoleId),
278 Item(CatalogItemId),
279 NetworkPolicy(NetworkPolicyId),
280}
281
282impl DropObjectInfo {
283 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
286 match id {
287 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
288 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
289 cluster_id,
290 replica_id,
291 ReplicaCreateDropReason::Manual,
292 )),
293 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
294 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
295 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
296 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
297 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
298 }
299 }
300
301 fn to_object_id(&self) -> ObjectId {
304 match &self {
305 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
306 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
307 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
308 }
309 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
310 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
311 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
312 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
313 DropObjectInfo::NetworkPolicy(network_policy_id) => {
314 ObjectId::NetworkPolicy(network_policy_id.clone())
315 }
316 }
317 }
318}
319
320#[derive(Debug, Clone)]
322pub enum ReplicaCreateDropReason {
323 Manual,
328 ClusterScheduling(Vec<SchedulingDecision>),
331}
332
333impl ReplicaCreateDropReason {
334 pub fn into_audit_log(
335 self,
336 ) -> (
337 CreateOrDropClusterReplicaReasonV1,
338 Option<SchedulingDecisionsWithReasonsV2>,
339 ) {
340 let (reason, scheduling_policies) = match self {
341 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
342 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
343 CreateOrDropClusterReplicaReasonV1::Schedule,
344 Some(scheduling_decisions),
345 ),
346 };
347 (
348 reason,
349 scheduling_policies
350 .as_ref()
351 .map(SchedulingDecision::reasons_to_audit_log_reasons),
352 )
353 }
354}
355
356pub struct TransactionResult {
357 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
358 pub catalog_updates: Vec<ParsedStateUpdate>,
360 pub audit_events: Vec<VersionedEvent>,
361}
362
363#[derive(Debug, Clone, Copy)]
364enum TransactInnerMode {
365 Commit,
369 DryRun,
376}
377
378impl Catalog {
379 fn should_audit_log_item(item: &CatalogItem) -> bool {
380 !item.is_temporary()
381 }
382
383 fn temporary_ids(
386 &self,
387 ops: &[Op],
388 temporary_drops: BTreeSet<(&ConnectionId, String)>,
389 ) -> Result<BTreeSet<CatalogItemId>, Error> {
390 let mut creating = BTreeSet::new();
391 let mut temporary_ids = BTreeSet::new();
392 for op in ops.iter() {
393 if let Op::CreateItem {
394 id,
395 name,
396 item,
397 owner_id: _,
398 } = op
399 {
400 if let Some(conn_id) = item.conn_id() {
401 if self.item_exists_in_temp_schemas(conn_id, &name.item)
402 && !temporary_drops.contains(&(conn_id, name.item.clone()))
403 || creating.contains(&(conn_id, &name.item))
404 {
405 return Err(
406 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
407 );
408 } else {
409 creating.insert((conn_id, &name.item));
410 temporary_ids.insert(id.clone());
411 }
412 }
413 }
414 }
415 Ok(temporary_ids)
416 }
417
418 #[instrument(name = "catalog::transact")]
419 pub async fn transact(
420 &mut self,
421 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
424 oracle_write_ts: mz_repr::Timestamp,
425 session: Option<&ConnMeta>,
426 ops: Vec<Op>,
427 ) -> Result<TransactionResult, AdapterError> {
428 trace!("transact: {:?}", ops);
429 fail::fail_point!("catalog_transact", |arg| {
430 Err(AdapterError::Unstructured(anyhow::anyhow!(
431 "failpoint: {arg:?}"
432 )))
433 });
434
435 let drop_ids: BTreeSet<CatalogItemId> = ops
436 .iter()
437 .filter_map(|op| match op {
438 Op::DropObjects(drop_object_infos) => {
439 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
440 let item_ids = ids.filter_map(|id| match id {
441 ObjectId::Item(id) => Some(id),
442 _ => None,
443 });
444 Some(item_ids)
445 }
446 _ => None,
447 })
448 .flatten()
449 .collect();
450 let temporary_drops = drop_ids
451 .iter()
452 .filter_map(|id| {
453 let entry = self.get_entry(id);
454 match entry.item().conn_id() {
455 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
456 None => None,
457 }
458 })
459 .collect();
460
461 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
462 let mut builtin_table_updates = vec![];
463 let mut catalog_updates = vec![];
464 let mut audit_events = vec![];
465 let mut storage = self.storage().await;
466 let mut tx = storage
467 .transaction()
468 .await
469 .unwrap_or_terminate("starting catalog transaction");
470
471 let new_state = Self::transact_inner(
472 TransactInnerMode::Commit,
473 storage_collections,
474 oracle_write_ts,
475 session,
476 ops,
477 temporary_ids,
478 &mut builtin_table_updates,
479 &mut catalog_updates,
480 &mut audit_events,
481 &mut tx,
482 &self.state,
483 )
484 .await?;
485
486 tx.commit(oracle_write_ts)
491 .await
492 .unwrap_or_terminate("catalog storage transaction commit must succeed");
493
494 drop(storage);
497 if let Some(new_state) = new_state {
498 self.transient_revision += 1;
499 self.state = new_state;
500 }
501
502 Ok(TransactionResult {
503 builtin_table_updates,
504 catalog_updates,
505 audit_events,
506 })
507 }
508
509 pub async fn transact_incremental_dry_run(
524 &self,
525 base_state: &CatalogState,
526 ops: Vec<Op>,
527 session: Option<&ConnMeta>,
528 prev_snapshot: Option<Snapshot>,
529 oracle_write_ts: mz_repr::Timestamp,
530 ) -> Result<(CatalogState, Snapshot), AdapterError> {
531 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
534
535 let mut builtin_table_updates = vec![];
536 let mut catalog_updates = vec![];
537 let mut audit_events = vec![];
538 let mut storage = self.storage().await;
539 let mut tx = if let Some(snapshot) = prev_snapshot {
540 storage
543 .transaction_from_snapshot(snapshot)
544 .unwrap_or_terminate("starting catalog transaction from snapshot")
545 } else {
546 storage
549 .transaction()
550 .await
551 .unwrap_or_terminate("starting catalog transaction")
552 };
553
554 let new_state = Self::transact_inner(
556 TransactInnerMode::DryRun,
557 None,
558 oracle_write_ts,
559 session,
560 ops,
561 temporary_ids,
562 &mut builtin_table_updates,
563 &mut catalog_updates,
564 &mut audit_events,
565 &mut tx,
566 base_state,
567 )
568 .await?;
569
570 let new_snapshot = tx.current_snapshot();
573
574 drop(storage);
576
577 let state = new_state.unwrap_or_else(|| base_state.clone());
579 Ok((state, new_snapshot))
580 }
581
582 fn extract_expressions_from_ops(
586 ops: &[Op],
587 optimizer_features: &OptimizerFeatures,
588 ) -> BTreeMap<GlobalId, LocalExpressions> {
589 let mut exprs = BTreeMap::new();
590
591 for op in ops {
592 if let Op::CreateItem { item, .. } = op {
593 match item {
594 CatalogItem::View(view) => {
595 exprs.insert(
596 view.global_id,
597 LocalExpressions {
598 local_mir: (*view.locally_optimized_expr).clone(),
599 optimizer_features: optimizer_features.clone(),
600 },
601 );
602 }
603 CatalogItem::MaterializedView(mv) => {
604 exprs.insert(
605 mv.global_id_writes(),
606 LocalExpressions {
607 local_mir: (*mv.locally_optimized_expr).clone(),
608 optimizer_features: optimizer_features.clone(),
609 },
610 );
611 }
612 CatalogItem::Table(_)
613 | CatalogItem::Source(_)
614 | CatalogItem::Log(_)
615 | CatalogItem::Sink(_)
616 | CatalogItem::Index(_)
617 | CatalogItem::Type(_)
618 | CatalogItem::Func(_)
619 | CatalogItem::Secret(_)
620 | CatalogItem::Connection(_) => {}
621 }
622 }
623 }
624
625 exprs
626 }
627
628 #[instrument(name = "catalog::transact_inner")]
636 async fn transact_inner(
637 mode: TransactInnerMode,
638 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
639 oracle_write_ts: mz_repr::Timestamp,
640 session: Option<&ConnMeta>,
641 ops: Vec<Op>,
642 temporary_ids: BTreeSet<CatalogItemId>,
643 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
644 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
645 audit_events: &mut Vec<VersionedEvent>,
646 tx: &mut Transaction<'_>,
647 state: &CatalogState,
648 ) -> Result<Option<CatalogState>, AdapterError> {
649 let mut preliminary_state = Cow::Borrowed(state);
676
677 let mut state = Cow::Borrowed(state);
679
680 if ops.is_empty() {
681 return Ok(None);
682 }
683
684 let optimizer_features = OptimizerFeatures::from(state.system_config());
687 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
688
689 let mut storage_collections_to_create = BTreeSet::new();
690 let mut storage_collections_to_drop = BTreeSet::new();
691 let mut storage_collections_to_register = BTreeMap::new();
692
693 let mut updates = Vec::new();
694
695 for op in ops {
696 let temporary_item_updates = Self::transact_op(
697 oracle_write_ts,
698 session,
699 op,
700 &temporary_ids,
701 audit_events,
702 tx,
703 &*preliminary_state,
704 &mut storage_collections_to_create,
705 &mut storage_collections_to_drop,
706 &mut storage_collections_to_register,
707 )
708 .await?;
709
710 let upper = tx.upper();
715 let temporary_item_updates =
716 temporary_item_updates
717 .into_iter()
718 .map(|(item, diff)| StateUpdate {
719 kind: StateUpdateKind::TemporaryItem(item),
720 ts: upper,
721 diff,
722 });
723
724 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
725 op_updates.extend(temporary_item_updates);
726 if !op_updates.is_empty() {
727 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
730 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
731 .to_mut()
732 .apply_updates(op_updates.clone(), &mut local_expr_cache)
733 .await;
734 }
735 updates.append(&mut op_updates);
736 }
737
738 if !updates.is_empty() {
739 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
740 let (op_builtin_table_updates, op_catalog_updates) = state
741 .to_mut()
742 .apply_updates(updates.clone(), &mut local_expr_cache)
743 .await;
744 let op_builtin_table_updates = state
745 .to_mut()
746 .resolve_builtin_table_updates(op_builtin_table_updates);
747 builtin_table_updates.extend(op_builtin_table_updates);
748 parsed_catalog_updates.extend(op_catalog_updates);
749 }
750
751 match mode {
752 TransactInnerMode::Commit => {
753 if let Some(c) = storage_collections {
755 c.prepare_state(
756 tx,
757 storage_collections_to_create,
758 storage_collections_to_drop,
759 storage_collections_to_register,
760 )
761 .await?;
762 }
763 }
764 TransactInnerMode::DryRun => {
765 debug_assert!(
766 storage_collections.is_none(),
767 "dry-run mode must not prepare storage state"
768 );
769 }
770 }
771
772 let updates = tx.get_and_commit_op_updates();
773 if !updates.is_empty() {
774 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
775 let (op_builtin_table_updates, op_catalog_updates) = state
776 .to_mut()
777 .apply_updates(updates.clone(), &mut local_expr_cache)
778 .await;
779 let op_builtin_table_updates = state
780 .to_mut()
781 .resolve_builtin_table_updates(op_builtin_table_updates);
782 builtin_table_updates.extend(op_builtin_table_updates);
783 parsed_catalog_updates.extend(op_catalog_updates);
784 }
785
786 match state {
787 Cow::Owned(state) => Ok(Some(state)),
788 Cow::Borrowed(_) => Ok(None),
789 }
790 }
791
792 #[instrument]
800 async fn transact_op(
801 oracle_write_ts: mz_repr::Timestamp,
802 session: Option<&ConnMeta>,
803 op: Op,
804 temporary_ids: &BTreeSet<CatalogItemId>,
805 audit_events: &mut Vec<VersionedEvent>,
806 tx: &mut Transaction<'_>,
807 state: &CatalogState,
808 storage_collections_to_create: &mut BTreeSet<GlobalId>,
809 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
810 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
811 ) -> Result<Vec<(TemporaryItem, StateDiff)>, AdapterError> {
812 let mut temporary_item_updates = Vec::new();
813
814 match op {
815 Op::AlterRetainHistory { id, value, window } => {
816 let entry = state.get_entry(&id);
817 if id.is_system() {
818 let name = entry.name();
819 let full_name =
820 state.resolve_full_name(name, session.map(|session| session.conn_id()));
821 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
822 full_name.to_string(),
823 ))));
824 }
825
826 let mut new_entry = entry.clone();
827 let previous = new_entry
828 .item
829 .update_retain_history(value.clone(), window)
830 .map_err(|_| {
831 AdapterError::Catalog(Error::new(ErrorKind::Internal(
832 "planner should have rejected invalid alter retain history item type"
833 .to_string(),
834 )))
835 })?;
836
837 if Self::should_audit_log_item(new_entry.item()) {
838 let details =
839 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
840 id: id.to_string(),
841 old_history: previous.map(|previous| previous.to_string()),
842 new_history: value.map(|v| v.to_string()),
843 });
844 CatalogState::add_to_audit_log(
845 &state.system_configuration,
846 oracle_write_ts,
847 session,
848 tx,
849 audit_events,
850 EventType::Alter,
851 catalog_type_to_audit_object_type(new_entry.item().typ()),
852 details,
853 )?;
854 }
855
856 tx.update_item(id, new_entry.into())?;
857
858 Self::log_update(state, &id);
859 }
860 Op::AlterSourceTimestampInterval {
861 id,
862 value,
863 interval,
864 } => {
865 let entry = state.get_entry(&id);
866 if id.is_system() {
867 let name = entry.name();
868 let full_name =
869 state.resolve_full_name(name, session.map(|session| session.conn_id()));
870 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
871 full_name.to_string(),
872 ))));
873 }
874
875 let mut new_entry = entry.clone();
876 let previous = new_entry
877 .item
878 .update_timestamp_interval(value.clone(), interval)
879 .map_err(|_| {
880 AdapterError::Catalog(Error::new(ErrorKind::Internal(
881 "planner should have rejected invalid alter timestamp interval item type"
882 .to_string(),
883 )))
884 })?;
885
886 if Self::should_audit_log_item(new_entry.item()) {
887 let details = EventDetails::AlterSourceTimestampIntervalV1(
888 mz_audit_log::AlterSourceTimestampIntervalV1 {
889 id: id.to_string(),
890 old_interval: previous.map(|previous| previous.to_string()),
891 new_interval: value.map(|v| v.to_string()),
892 },
893 );
894 CatalogState::add_to_audit_log(
895 &state.system_configuration,
896 oracle_write_ts,
897 session,
898 tx,
899 audit_events,
900 EventType::Alter,
901 catalog_type_to_audit_object_type(new_entry.item().typ()),
902 details,
903 )?;
904 }
905
906 tx.update_item(id, new_entry.into())?;
907
908 Self::log_update(state, &id);
909 }
910 Op::AlterRole {
911 id,
912 name,
913 attributes,
914 nopassword,
915 vars,
916 } => {
917 state.ensure_not_reserved_role(&id)?;
918
919 let mut existing_role = state.get_role(&id).clone();
920 let password = attributes.password.clone();
921 let scram_iterations = attributes
922 .scram_iterations
923 .unwrap_or_else(|| state.system_config().scram_iterations());
924 existing_role.attributes = attributes.into();
925 existing_role.vars = vars;
926 let password_action = if nopassword {
927 PasswordAction::Clear
928 } else if let Some(password) = password {
929 PasswordAction::Set(PasswordConfig {
930 password,
931 scram_iterations,
932 })
933 } else {
934 PasswordAction::NoChange
935 };
936 tx.update_role(id, existing_role.into(), password_action)?;
937
938 CatalogState::add_to_audit_log(
939 &state.system_configuration,
940 oracle_write_ts,
941 session,
942 tx,
943 audit_events,
944 EventType::Alter,
945 ObjectType::Role,
946 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
947 id: id.to_string(),
948 name: name.clone(),
949 }),
950 )?;
951
952 info!("update role {name} ({id})");
953 }
954 Op::AlterNetworkPolicy {
955 id,
956 rules,
957 name,
958 owner_id: _owner_id,
959 } => {
960 let existing_policy = state.get_network_policy(&id).clone();
961 let mut policy: NetworkPolicy = existing_policy.into();
962 policy.rules = rules;
963 if is_reserved_name(&name) {
964 return Err(AdapterError::Catalog(Error::new(
965 ErrorKind::ReservedNetworkPolicyName(name),
966 )));
967 }
968 tx.update_network_policy(id, policy.clone())?;
969
970 CatalogState::add_to_audit_log(
971 &state.system_configuration,
972 oracle_write_ts,
973 session,
974 tx,
975 audit_events,
976 EventType::Alter,
977 ObjectType::NetworkPolicy,
978 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
979 id: id.to_string(),
980 name: name.clone(),
981 }),
982 )?;
983
984 info!("update network policy {name} ({id})");
985 }
986 Op::AlterAddColumn {
987 id,
988 new_global_id,
989 name,
990 typ,
991 sql,
992 } => {
993 let column_name = name.to_string();
994 let column_type = typ.to_string();
995 let nullable = typ.nullable;
996 let mut new_entry = state.get_entry(&id).clone();
997 let version = new_entry.item.add_column(name, typ, sql)?;
998 let shard_id = state
1001 .storage_metadata()
1002 .get_collection_shard(new_entry.latest_global_id())?;
1003
1004 let CatalogItem::Table(table) = &mut new_entry.item else {
1006 return Err(AdapterError::Unsupported("adding columns to non-Table"));
1007 };
1008 table.collections.insert(version, new_global_id);
1009
1010 if Self::should_audit_log_item(new_entry.item()) {
1011 let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
1012 id: id.to_string(),
1013 column: column_name,
1014 column_type,
1015 nullable,
1016 });
1017 CatalogState::add_to_audit_log(
1018 &state.system_configuration,
1019 oracle_write_ts,
1020 session,
1021 tx,
1022 audit_events,
1023 EventType::Alter,
1024 catalog_type_to_audit_object_type(new_entry.item().typ()),
1025 details,
1026 )?;
1027 }
1028
1029 tx.update_item(id, new_entry.into())?;
1030 storage_collections_to_register.insert(new_global_id, shard_id);
1031 }
1032 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1033 let mut new_entry = state.get_entry(&id).clone();
1034 let replacement = state.get_entry(&replacement_id);
1035
1036 let new_audit_events =
1037 apply_replacement_audit_events(state, &new_entry, replacement);
1038
1039 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1040 return Err(AdapterError::internal(
1041 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1042 "id must refer to a materialized view",
1043 ));
1044 };
1045 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1046 return Err(AdapterError::internal(
1047 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1048 "replacement_id must refer to a materialized view",
1049 ));
1050 };
1051
1052 mv.apply_replacement(replacement_mv.clone());
1053
1054 tx.remove_item(replacement_id)?;
1055
1056 new_entry.id = replacement_id;
1057 tx_replace_item(tx, state, id, new_entry)?;
1058
1059 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1060 tx.drop_comments(&[comment_id].into())?;
1061
1062 for (event_type, details) in new_audit_events {
1063 CatalogState::add_to_audit_log(
1064 &state.system_configuration,
1065 oracle_write_ts,
1066 session,
1067 tx,
1068 audit_events,
1069 event_type,
1070 ObjectType::MaterializedView,
1071 details,
1072 )?;
1073 }
1074 }
1075 Op::CreateDatabase { name, owner_id } => {
1076 let database_owner_privileges = vec![rbac::owner_privilege(
1077 mz_sql::catalog::ObjectType::Database,
1078 owner_id,
1079 )];
1080 let database_default_privileges = state
1081 .default_privileges
1082 .get_applicable_privileges(
1083 owner_id,
1084 None,
1085 None,
1086 mz_sql::catalog::ObjectType::Database,
1087 )
1088 .map(|item| item.mz_acl_item(owner_id));
1089 let database_privileges: Vec<_> = merge_mz_acl_items(
1090 database_owner_privileges
1091 .into_iter()
1092 .chain(database_default_privileges),
1093 )
1094 .collect();
1095
1096 let schema_owner_privileges = vec![rbac::owner_privilege(
1097 mz_sql::catalog::ObjectType::Schema,
1098 owner_id,
1099 )];
1100 let schema_default_privileges = state
1101 .default_privileges
1102 .get_applicable_privileges(
1103 owner_id,
1104 None,
1105 None,
1106 mz_sql::catalog::ObjectType::Schema,
1107 )
1108 .map(|item| item.mz_acl_item(owner_id))
1109 .chain(std::iter::once(MzAclItem {
1111 grantee: RoleId::Public,
1112 grantor: owner_id,
1113 acl_mode: AclMode::USAGE,
1114 }));
1115 let schema_privileges: Vec<_> = merge_mz_acl_items(
1116 schema_owner_privileges
1117 .into_iter()
1118 .chain(schema_default_privileges),
1119 )
1120 .collect();
1121
1122 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1123 let (database_id, _) = tx.insert_user_database(
1124 &name,
1125 owner_id,
1126 database_privileges.clone(),
1127 &temporary_oids,
1128 )?;
1129 let (schema_id, _) = tx.insert_user_schema(
1130 database_id,
1131 DEFAULT_SCHEMA,
1132 owner_id,
1133 schema_privileges.clone(),
1134 &temporary_oids,
1135 )?;
1136 CatalogState::add_to_audit_log(
1137 &state.system_configuration,
1138 oracle_write_ts,
1139 session,
1140 tx,
1141 audit_events,
1142 EventType::Create,
1143 ObjectType::Database,
1144 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1145 id: database_id.to_string(),
1146 name: name.clone(),
1147 }),
1148 )?;
1149 info!("create database {}", name);
1150
1151 CatalogState::add_to_audit_log(
1152 &state.system_configuration,
1153 oracle_write_ts,
1154 session,
1155 tx,
1156 audit_events,
1157 EventType::Create,
1158 ObjectType::Schema,
1159 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1160 id: schema_id.to_string(),
1161 name: DEFAULT_SCHEMA.to_string(),
1162 database_name: Some(name),
1163 }),
1164 )?;
1165 }
1166 Op::CreateSchema {
1167 database_id,
1168 schema_name,
1169 owner_id,
1170 } => {
1171 if is_reserved_name(&schema_name) {
1172 return Err(AdapterError::Catalog(Error::new(
1173 ErrorKind::ReservedSchemaName(schema_name),
1174 )));
1175 }
1176 let database_id = match database_id {
1177 ResolvedDatabaseSpecifier::Id(id) => id,
1178 ResolvedDatabaseSpecifier::Ambient => {
1179 return Err(AdapterError::Catalog(Error::new(
1180 ErrorKind::ReadOnlySystemSchema(schema_name),
1181 )));
1182 }
1183 };
1184 let owner_privileges = vec![rbac::owner_privilege(
1185 mz_sql::catalog::ObjectType::Schema,
1186 owner_id,
1187 )];
1188 let default_privileges = state
1189 .default_privileges
1190 .get_applicable_privileges(
1191 owner_id,
1192 Some(database_id),
1193 None,
1194 mz_sql::catalog::ObjectType::Schema,
1195 )
1196 .map(|item| item.mz_acl_item(owner_id));
1197 let privileges: Vec<_> =
1198 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1199 .collect();
1200 let (schema_id, _) = tx.insert_user_schema(
1201 database_id,
1202 &schema_name,
1203 owner_id,
1204 privileges.clone(),
1205 &state.get_temporary_oids().collect(),
1206 )?;
1207 CatalogState::add_to_audit_log(
1208 &state.system_configuration,
1209 oracle_write_ts,
1210 session,
1211 tx,
1212 audit_events,
1213 EventType::Create,
1214 ObjectType::Schema,
1215 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1216 id: schema_id.to_string(),
1217 name: schema_name.clone(),
1218 database_name: Some(state.database_by_id[&database_id].name.clone()),
1219 }),
1220 )?;
1221 }
1222 Op::CreateRole { name, attributes } => {
1223 if is_reserved_role_name(&name) {
1224 return Err(AdapterError::Catalog(Error::new(
1225 ErrorKind::ReservedRoleName(name),
1226 )));
1227 }
1228 let membership = RoleMembership::new();
1229 let vars = RoleVars::default();
1230 let (id, _) = tx.insert_user_role(
1231 name.clone(),
1232 attributes.clone(),
1233 membership.clone(),
1234 vars.clone(),
1235 &state.get_temporary_oids().collect(),
1236 )?;
1237 CatalogState::add_to_audit_log(
1238 &state.system_configuration,
1239 oracle_write_ts,
1240 session,
1241 tx,
1242 audit_events,
1243 EventType::Create,
1244 ObjectType::Role,
1245 EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1246 id: id.to_string(),
1247 name: name.clone(),
1248 auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1249 AutoProvisionSource::Oidc => "oidc".to_string(),
1250 AutoProvisionSource::Frontegg => "frontegg".to_string(),
1251 AutoProvisionSource::None => "none".to_string(),
1252 }),
1253 }),
1254 )?;
1255 info!("create role {}", name);
1256 }
1257 Op::CreateCluster {
1258 id,
1259 name,
1260 introspection_sources,
1261 owner_id,
1262 config,
1263 } => {
1264 if is_reserved_name(&name) {
1265 return Err(AdapterError::Catalog(Error::new(
1266 ErrorKind::ReservedClusterName(name),
1267 )));
1268 }
1269 let owner_privileges = vec![rbac::owner_privilege(
1270 mz_sql::catalog::ObjectType::Cluster,
1271 owner_id,
1272 )];
1273 let default_privileges = state
1274 .default_privileges
1275 .get_applicable_privileges(
1276 owner_id,
1277 None,
1278 None,
1279 mz_sql::catalog::ObjectType::Cluster,
1280 )
1281 .map(|item| item.mz_acl_item(owner_id));
1282 let privileges: Vec<_> =
1283 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1284 .collect();
1285 let introspection_source_ids: Vec<_> = introspection_sources
1286 .iter()
1287 .map(|introspection_source| {
1288 Transaction::allocate_introspection_source_index_id(
1289 &id,
1290 introspection_source.variant,
1291 )
1292 })
1293 .collect();
1294
1295 let introspection_sources = introspection_sources
1296 .into_iter()
1297 .zip_eq(introspection_source_ids)
1298 .map(|(log, (item_id, gid))| (log, item_id, gid))
1299 .collect();
1300
1301 tx.insert_user_cluster(
1302 id,
1303 &name,
1304 introspection_sources,
1305 owner_id,
1306 privileges.clone(),
1307 config.clone().into(),
1308 &state.get_temporary_oids().collect(),
1309 )?;
1310 CatalogState::add_to_audit_log(
1311 &state.system_configuration,
1312 oracle_write_ts,
1313 session,
1314 tx,
1315 audit_events,
1316 EventType::Create,
1317 ObjectType::Cluster,
1318 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1319 id: id.to_string(),
1320 name: name.clone(),
1321 }),
1322 )?;
1323 info!("create cluster {}", name);
1324 }
1325 Op::CreateClusterReplica {
1326 cluster_id,
1327 name,
1328 config,
1329 owner_id,
1330 reason,
1331 } => {
1332 if is_reserved_name(&name) {
1333 return Err(AdapterError::Catalog(Error::new(
1334 ErrorKind::ReservedReplicaName(name),
1335 )));
1336 }
1337 let cluster = state.get_cluster(cluster_id);
1338 let id =
1339 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1340 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1341 size,
1342 billed_as,
1343 internal,
1344 ..
1345 }) = &config.location
1346 {
1347 let (reason, scheduling_policies) = reason.into_audit_log();
1348 let details = EventDetails::CreateClusterReplicaV4(
1349 mz_audit_log::CreateClusterReplicaV4 {
1350 cluster_id: cluster_id.to_string(),
1351 cluster_name: cluster.name.clone(),
1352 replica_id: Some(id.to_string()),
1353 replica_name: name.clone(),
1354 logical_size: size.clone(),
1355 billed_as: billed_as.clone(),
1356 internal: *internal,
1357 reason,
1358 scheduling_policies,
1359 },
1360 );
1361 CatalogState::add_to_audit_log(
1362 &state.system_configuration,
1363 oracle_write_ts,
1364 session,
1365 tx,
1366 audit_events,
1367 EventType::Create,
1368 ObjectType::ClusterReplica,
1369 details,
1370 )?;
1371 }
1372 }
1373 Op::CreateItem {
1374 id,
1375 name,
1376 item,
1377 owner_id,
1378 } => {
1379 state.check_unstable_dependencies(&item)?;
1380
1381 match &item {
1382 CatalogItem::Table(table) => {
1383 let gids: Vec<_> = table.global_ids().collect();
1384 assert_eq!(gids.len(), 1);
1385 storage_collections_to_create.extend(gids);
1386 }
1387 CatalogItem::Source(source) => {
1388 storage_collections_to_create.insert(source.global_id());
1389 }
1390 CatalogItem::MaterializedView(mv) => {
1391 let mv_gid = mv.global_id_writes();
1392 if let Some(target_id) = mv.replacement_target {
1393 let target_gid = state.get_entry(&target_id).latest_global_id();
1394 let shard_id =
1395 state.storage_metadata().get_collection_shard(target_gid)?;
1396 storage_collections_to_register.insert(mv_gid, shard_id);
1397 } else {
1398 storage_collections_to_create.insert(mv_gid);
1399 }
1400 }
1401 CatalogItem::Sink(sink) => {
1402 storage_collections_to_create.insert(sink.global_id());
1403 }
1404 CatalogItem::Log(_)
1405 | CatalogItem::View(_)
1406 | CatalogItem::Index(_)
1407 | CatalogItem::Type(_)
1408 | CatalogItem::Func(_)
1409 | CatalogItem::Secret(_)
1410 | CatalogItem::Connection(_) => (),
1411 }
1412
1413 let system_user = session.map_or(false, |s| s.user().is_system_user());
1414 if !system_user {
1415 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1416 let cluster_name = state.clusters_by_id[&id].name.clone();
1417 return Err(AdapterError::Catalog(Error::new(
1418 ErrorKind::ReadOnlyCluster(cluster_name),
1419 )));
1420 }
1421 }
1422
1423 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1424 let default_privileges = state
1425 .default_privileges
1426 .get_applicable_privileges(
1427 owner_id,
1428 name.qualifiers.database_spec.id(),
1429 Some(name.qualifiers.schema_spec.into()),
1430 item.typ().into(),
1431 )
1432 .map(|item| item.mz_acl_item(owner_id));
1433 let progress_source_privilege = if item.is_progress_source() {
1435 Some(MzAclItem {
1436 grantee: MZ_SUPPORT_ROLE_ID,
1437 grantor: owner_id,
1438 acl_mode: AclMode::SELECT,
1439 })
1440 } else {
1441 None
1442 };
1443 let privileges: Vec<_> = merge_mz_acl_items(
1444 owner_privileges
1445 .into_iter()
1446 .chain(default_privileges)
1447 .chain(progress_source_privilege),
1448 )
1449 .collect();
1450
1451 let temporary_oids = state.get_temporary_oids().collect();
1452
1453 if item.is_temporary() {
1454 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1455 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1456 {
1457 return Err(AdapterError::Catalog(Error::new(
1458 ErrorKind::InvalidTemporarySchema,
1459 )));
1460 }
1461 let oid = tx.allocate_oid(&temporary_oids)?;
1462
1463 let schema_id = name.qualifiers.schema_spec.clone().into();
1464 let item_type = item.typ();
1465 let (create_sql, global_id, versions) = item.to_serialized();
1466
1467 let item = TemporaryItem {
1468 id,
1469 oid,
1470 global_id,
1471 schema_id,
1472 name: name.item.clone(),
1473 create_sql,
1474 conn_id: item.conn_id().cloned(),
1475 owner_id,
1476 privileges: privileges.clone(),
1477 extra_versions: versions,
1478 };
1479 temporary_item_updates.push((item, StateDiff::Addition));
1480
1481 info!(
1482 "create temporary {} {} ({})",
1483 item_type,
1484 state.resolve_full_name(&name, None),
1485 id
1486 );
1487 } else {
1488 if let Some(temp_id) =
1489 item.uses()
1490 .iter()
1491 .find(|id| match state.try_get_entry(*id) {
1492 Some(entry) => entry.item().is_temporary(),
1493 None => temporary_ids.contains(id),
1494 })
1495 {
1496 let temp_item = state.get_entry(temp_id);
1497 return Err(AdapterError::Catalog(Error::new(
1498 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1499 )));
1500 }
1501 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1502 && !system_user
1503 {
1504 let schema_name = state
1505 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1506 .schema;
1507 return Err(AdapterError::Catalog(Error::new(
1508 ErrorKind::ReadOnlySystemSchema(schema_name),
1509 )));
1510 }
1511 let schema_id = name.qualifiers.schema_spec.clone().into();
1512 let item_type = item.typ();
1513 let (create_sql, global_id, versions) = item.to_serialized();
1514 tx.insert_user_item(
1515 id,
1516 global_id,
1517 schema_id,
1518 &name.item,
1519 create_sql,
1520 owner_id,
1521 privileges.clone(),
1522 &temporary_oids,
1523 versions,
1524 )?;
1525 info!(
1526 "create {} {} ({})",
1527 item_type,
1528 state.resolve_full_name(&name, None),
1529 id
1530 );
1531 }
1532
1533 if Self::should_audit_log_item(&item) {
1534 let name = Self::full_name_detail(
1535 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1536 );
1537 let details = match &item {
1538 CatalogItem::Source(s) => {
1539 let cluster_id = match s.data_source {
1540 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1543 match state.get_entry(&ingestion_id).cluster_id() {
1544 Some(cluster_id) => Some(cluster_id.to_string()),
1545 None => None,
1546 }
1547 }
1548 _ => match item.cluster_id() {
1549 Some(cluster_id) => Some(cluster_id.to_string()),
1550 None => None,
1551 },
1552 };
1553
1554 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1555 id: id.to_string(),
1556 cluster_id,
1557 name,
1558 external_type: s.source_type().to_string(),
1559 })
1560 }
1561 CatalogItem::Sink(s) => {
1562 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1563 id: id.to_string(),
1564 cluster_id: Some(s.cluster_id.to_string()),
1565 name,
1566 external_type: s.sink_type().to_string(),
1567 })
1568 }
1569 CatalogItem::Index(i) => {
1570 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1571 id: id.to_string(),
1572 name,
1573 cluster_id: i.cluster_id.to_string(),
1574 })
1575 }
1576 CatalogItem::MaterializedView(mv) => {
1577 EventDetails::CreateMaterializedViewV1(
1578 mz_audit_log::CreateMaterializedViewV1 {
1579 id: id.to_string(),
1580 name,
1581 cluster_id: mv.cluster_id.to_string(),
1582 replacement_target_id: mv
1583 .replacement_target
1584 .map(|id| id.to_string()),
1585 },
1586 )
1587 }
1588 CatalogItem::Table(_)
1589 | CatalogItem::Log(_)
1590 | CatalogItem::View(_)
1591 | CatalogItem::Type(_)
1592 | CatalogItem::Func(_)
1593 | CatalogItem::Secret(_)
1594 | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1595 id: id.to_string(),
1596 name,
1597 }),
1598 };
1599 CatalogState::add_to_audit_log(
1600 &state.system_configuration,
1601 oracle_write_ts,
1602 session,
1603 tx,
1604 audit_events,
1605 EventType::Create,
1606 catalog_type_to_audit_object_type(item.typ()),
1607 details,
1608 )?;
1609 }
1610 }
1611 Op::CreateNetworkPolicy {
1612 rules,
1613 name,
1614 owner_id,
1615 } => {
1616 if state.network_policies_by_name.contains_key(&name) {
1617 return Err(AdapterError::PlanError(PlanError::Catalog(
1618 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1619 )));
1620 }
1621 if is_reserved_name(&name) {
1622 return Err(AdapterError::Catalog(Error::new(
1623 ErrorKind::ReservedNetworkPolicyName(name),
1624 )));
1625 }
1626
1627 let owner_privileges = vec![rbac::owner_privilege(
1628 mz_sql::catalog::ObjectType::NetworkPolicy,
1629 owner_id,
1630 )];
1631 let default_privileges = state
1632 .default_privileges
1633 .get_applicable_privileges(
1634 owner_id,
1635 None,
1636 None,
1637 mz_sql::catalog::ObjectType::NetworkPolicy,
1638 )
1639 .map(|item| item.mz_acl_item(owner_id));
1640 let privileges: Vec<_> =
1641 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1642 .collect();
1643
1644 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1645 let id = tx.insert_user_network_policy(
1646 name.clone(),
1647 rules,
1648 privileges,
1649 owner_id,
1650 &temporary_oids,
1651 )?;
1652
1653 CatalogState::add_to_audit_log(
1654 &state.system_configuration,
1655 oracle_write_ts,
1656 session,
1657 tx,
1658 audit_events,
1659 EventType::Create,
1660 ObjectType::NetworkPolicy,
1661 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1662 id: id.to_string(),
1663 name: name.clone(),
1664 }),
1665 )?;
1666
1667 info!("created network policy {name} ({id})");
1668 }
1669 Op::Comment {
1670 object_id,
1671 sub_component,
1672 comment,
1673 } => {
1674 tx.update_comment(object_id, sub_component, comment)?;
1675 let entry = state.get_comment_id_entry(&object_id);
1676 let should_log = entry
1677 .map(|entry| Self::should_audit_log_item(entry.item()))
1678 .unwrap_or(true);
1680 if let (Some(conn_id), true) =
1683 (session.map(|session| session.conn_id()), should_log)
1684 {
1685 CatalogState::add_to_audit_log(
1686 &state.system_configuration,
1687 oracle_write_ts,
1688 session,
1689 tx,
1690 audit_events,
1691 EventType::Comment,
1692 comment_id_to_audit_object_type(object_id),
1693 EventDetails::IdNameV1(IdNameV1 {
1694 id: format!("{object_id:?}"),
1696 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1697 }),
1698 )?;
1699 }
1700 }
1701 Op::UpdateSourceReferences {
1702 source_id,
1703 references,
1704 } => {
1705 tx.update_source_references(
1706 source_id,
1707 references
1708 .references
1709 .into_iter()
1710 .map(|reference| reference.into())
1711 .collect(),
1712 references.updated_at,
1713 )?;
1714 }
1715 Op::DropObjects(drop_object_infos) => {
1716 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1718
1719 tx.drop_comments(&delta.comments)?;
1721
1722 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1724 delta
1725 .items
1726 .iter()
1727 .map(|id| id)
1728 .partition(|id| !state.get_entry(*id).item().is_temporary());
1729 tx.remove_items(&durable_items_to_drop)?;
1730 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1731 let entry = state.get_entry(&id);
1732 (entry.clone().into(), StateDiff::Retraction)
1733 }));
1734
1735 for item_id in delta.items {
1736 let entry = state.get_entry(&item_id);
1737
1738 if entry.item().is_storage_collection() {
1739 storage_collections_to_drop.extend(entry.global_ids());
1740 }
1741
1742 if state.source_references.contains_key(&item_id) {
1743 tx.remove_source_references(item_id)?;
1744 }
1745
1746 if Self::should_audit_log_item(entry.item()) {
1747 CatalogState::add_to_audit_log(
1748 &state.system_configuration,
1749 oracle_write_ts,
1750 session,
1751 tx,
1752 audit_events,
1753 EventType::Drop,
1754 catalog_type_to_audit_object_type(entry.item().typ()),
1755 EventDetails::IdFullNameV1(IdFullNameV1 {
1756 id: item_id.to_string(),
1757 name: Self::full_name_detail(&state.resolve_full_name(
1758 entry.name(),
1759 session.map(|session| session.conn_id()),
1760 )),
1761 }),
1762 )?;
1763 }
1764 info!(
1765 "drop {} {} ({})",
1766 entry.item_type(),
1767 state.resolve_full_name(entry.name(), entry.conn_id()),
1768 item_id
1769 );
1770 }
1771
1772 let schemas = delta
1774 .schemas
1775 .iter()
1776 .map(|(schema_spec, database_spec)| {
1777 (SchemaId::from(schema_spec), *database_spec)
1778 })
1779 .collect();
1780 tx.remove_schemas(&schemas)?;
1781
1782 for (schema_spec, database_spec) in delta.schemas {
1783 let schema = state.get_schema(
1784 &database_spec,
1785 &schema_spec,
1786 session
1787 .map(|session| session.conn_id())
1788 .unwrap_or(&SYSTEM_CONN_ID),
1789 );
1790
1791 let schema_id = SchemaId::from(schema_spec);
1792 let database_id = match database_spec {
1793 ResolvedDatabaseSpecifier::Ambient => None,
1794 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1795 };
1796
1797 CatalogState::add_to_audit_log(
1798 &state.system_configuration,
1799 oracle_write_ts,
1800 session,
1801 tx,
1802 audit_events,
1803 EventType::Drop,
1804 ObjectType::Schema,
1805 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1806 id: schema_id.to_string(),
1807 name: schema.name.schema.to_string(),
1808 database_name: database_id
1809 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1810 }),
1811 )?;
1812 }
1813
1814 tx.remove_databases(&delta.databases)?;
1816
1817 for database_id in delta.databases {
1818 let database = state.get_database(&database_id).clone();
1819
1820 CatalogState::add_to_audit_log(
1821 &state.system_configuration,
1822 oracle_write_ts,
1823 session,
1824 tx,
1825 audit_events,
1826 EventType::Drop,
1827 ObjectType::Database,
1828 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1829 id: database_id.to_string(),
1830 name: database.name.clone(),
1831 }),
1832 )?;
1833 }
1834
1835 tx.remove_user_roles(&delta.roles)?;
1837
1838 for role_id in delta.roles {
1839 let role = state
1840 .roles_by_id
1841 .get(&role_id)
1842 .expect("catalog out of sync");
1843
1844 CatalogState::add_to_audit_log(
1845 &state.system_configuration,
1846 oracle_write_ts,
1847 session,
1848 tx,
1849 audit_events,
1850 EventType::Drop,
1851 ObjectType::Role,
1852 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1853 id: role.id.to_string(),
1854 name: role.name.clone(),
1855 }),
1856 )?;
1857 info!("drop role {}", role.name());
1858 }
1859
1860 tx.remove_network_policies(&delta.network_policies)?;
1862
1863 for network_policy_id in delta.network_policies {
1864 let policy = state
1865 .network_policies_by_id
1866 .get(&network_policy_id)
1867 .expect("catalog out of sync");
1868
1869 CatalogState::add_to_audit_log(
1870 &state.system_configuration,
1871 oracle_write_ts,
1872 session,
1873 tx,
1874 audit_events,
1875 EventType::Drop,
1876 ObjectType::NetworkPolicy,
1877 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1878 id: policy.id.to_string(),
1879 name: policy.name.clone(),
1880 }),
1881 )?;
1882 info!("drop network policy {}", policy.name.clone());
1883 }
1884
1885 let replicas = delta.replicas.keys().copied().collect();
1887 tx.remove_cluster_replicas(&replicas)?;
1888
1889 for (replica_id, (cluster_id, reason)) in delta.replicas {
1890 let cluster = state.get_cluster(cluster_id);
1891 let replica = cluster.replica(replica_id).expect("Must exist");
1892
1893 let (reason, scheduling_policies) = reason.into_audit_log();
1894 let details =
1895 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1896 cluster_id: cluster_id.to_string(),
1897 cluster_name: cluster.name.clone(),
1898 replica_id: Some(replica_id.to_string()),
1899 replica_name: replica.name.clone(),
1900 reason,
1901 scheduling_policies,
1902 });
1903 CatalogState::add_to_audit_log(
1904 &state.system_configuration,
1905 oracle_write_ts,
1906 session,
1907 tx,
1908 audit_events,
1909 EventType::Drop,
1910 ObjectType::ClusterReplica,
1911 details,
1912 )?;
1913 }
1914
1915 tx.remove_clusters(&delta.clusters)?;
1917
1918 for cluster_id in delta.clusters {
1919 let cluster = state.get_cluster(cluster_id);
1920
1921 CatalogState::add_to_audit_log(
1922 &state.system_configuration,
1923 oracle_write_ts,
1924 session,
1925 tx,
1926 audit_events,
1927 EventType::Drop,
1928 ObjectType::Cluster,
1929 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1930 id: cluster.id.to_string(),
1931 name: cluster.name.clone(),
1932 }),
1933 )?;
1934 }
1935 }
1936 Op::GrantRole {
1937 role_id,
1938 member_id,
1939 grantor_id,
1940 } => {
1941 state.ensure_not_reserved_role(&member_id)?;
1942 state.ensure_grantable_role(&role_id)?;
1943 if state.collect_role_membership(&role_id).contains(&member_id) {
1944 let group_role = state.get_role(&role_id);
1945 let member_role = state.get_role(&member_id);
1946 return Err(AdapterError::Catalog(Error::new(
1947 ErrorKind::CircularRoleMembership {
1948 role_name: group_role.name().to_string(),
1949 member_name: member_role.name().to_string(),
1950 },
1951 )));
1952 }
1953 let mut member_role = state.get_role(&member_id).clone();
1954 member_role.membership.map.insert(role_id, grantor_id);
1955 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1956
1957 CatalogState::add_to_audit_log(
1958 &state.system_configuration,
1959 oracle_write_ts,
1960 session,
1961 tx,
1962 audit_events,
1963 EventType::Grant,
1964 ObjectType::Role,
1965 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1966 role_id: role_id.to_string(),
1967 member_id: member_id.to_string(),
1968 grantor_id: grantor_id.to_string(),
1969 executed_by: session
1970 .map(|session| session.authenticated_role_id())
1971 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1972 .to_string(),
1973 }),
1974 )?;
1975 }
1976 Op::RevokeRole {
1977 role_id,
1978 member_id,
1979 grantor_id,
1980 } => {
1981 state.ensure_not_reserved_role(&member_id)?;
1982 state.ensure_grantable_role(&role_id)?;
1983 let mut member_role = state.get_role(&member_id).clone();
1984 member_role.membership.map.remove(&role_id);
1985 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1986
1987 CatalogState::add_to_audit_log(
1988 &state.system_configuration,
1989 oracle_write_ts,
1990 session,
1991 tx,
1992 audit_events,
1993 EventType::Revoke,
1994 ObjectType::Role,
1995 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1996 role_id: role_id.to_string(),
1997 member_id: member_id.to_string(),
1998 grantor_id: grantor_id.to_string(),
1999 executed_by: session
2000 .map(|session| session.authenticated_role_id())
2001 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
2002 .to_string(),
2003 }),
2004 )?;
2005 }
2006 Op::UpdatePrivilege {
2007 target_id,
2008 privilege,
2009 variant,
2010 } => {
2011 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2012 UpdatePrivilegeVariant::Grant => {
2013 privileges.grant(privilege);
2014 }
2015 UpdatePrivilegeVariant::Revoke => {
2016 privileges.revoke(&privilege);
2017 }
2018 };
2019 match &target_id {
2020 SystemObjectId::Object(object_id) => match object_id {
2021 ObjectId::Cluster(id) => {
2022 let mut cluster = state.get_cluster(*id).clone();
2023 update_privilege_fn(&mut cluster.privileges);
2024 tx.update_cluster(*id, cluster.into())?;
2025 }
2026 ObjectId::Database(id) => {
2027 let mut database = state.get_database(id).clone();
2028 update_privilege_fn(&mut database.privileges);
2029 tx.update_database(*id, database.into())?;
2030 }
2031 ObjectId::NetworkPolicy(id) => {
2032 let mut policy = state.get_network_policy(id).clone();
2033 update_privilege_fn(&mut policy.privileges);
2034 tx.update_network_policy(*id, policy.into())?;
2035 }
2036 ObjectId::Schema((database_spec, schema_spec)) => {
2037 let schema_id = schema_spec.clone().into();
2038 let mut schema = state
2039 .get_schema(
2040 database_spec,
2041 schema_spec,
2042 session
2043 .map(|session| session.conn_id())
2044 .unwrap_or(&SYSTEM_CONN_ID),
2045 )
2046 .clone();
2047 update_privilege_fn(&mut schema.privileges);
2048 tx.update_schema(schema_id, schema.into())?;
2049 }
2050 ObjectId::Item(id) => {
2051 let entry = state.get_entry(id);
2052 let mut new_entry = entry.clone();
2053 update_privilege_fn(&mut new_entry.privileges);
2054 if !new_entry.item().is_temporary() {
2055 tx.update_item(*id, new_entry.into())?;
2056 } else {
2057 temporary_item_updates
2058 .push((entry.clone().into(), StateDiff::Retraction));
2059 temporary_item_updates
2060 .push((new_entry.into(), StateDiff::Addition));
2061 }
2062 }
2063 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2064 },
2065 SystemObjectId::System => {
2066 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2067 update_privilege_fn(&mut system_privileges);
2068 let new_privilege =
2069 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2070 tx.set_system_privilege(
2071 privilege.grantee,
2072 privilege.grantor,
2073 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2074 )?;
2075 }
2076 }
2077 let object_type = state.get_system_object_type(&target_id);
2078 let object_id_str = match &target_id {
2079 SystemObjectId::System => "SYSTEM".to_string(),
2080 SystemObjectId::Object(id) => id.to_string(),
2081 };
2082 CatalogState::add_to_audit_log(
2083 &state.system_configuration,
2084 oracle_write_ts,
2085 session,
2086 tx,
2087 audit_events,
2088 variant.into(),
2089 system_object_type_to_audit_object_type(&object_type),
2090 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2091 object_id: object_id_str,
2092 grantee_id: privilege.grantee.to_string(),
2093 grantor_id: privilege.grantor.to_string(),
2094 privileges: privilege.acl_mode.to_string(),
2095 }),
2096 )?;
2097 }
2098 Op::UpdateDefaultPrivilege {
2099 privilege_object,
2100 privilege_acl_item,
2101 variant,
2102 } => {
2103 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2104 match variant {
2105 UpdatePrivilegeVariant::Grant => default_privileges
2106 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2107 UpdatePrivilegeVariant::Revoke => {
2108 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2109 }
2110 }
2111 let new_acl_mode = default_privileges
2112 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2113 tx.set_default_privilege(
2114 privilege_object.role_id,
2115 privilege_object.database_id,
2116 privilege_object.schema_id,
2117 privilege_object.object_type,
2118 privilege_acl_item.grantee,
2119 new_acl_mode.cloned(),
2120 )?;
2121 CatalogState::add_to_audit_log(
2122 &state.system_configuration,
2123 oracle_write_ts,
2124 session,
2125 tx,
2126 audit_events,
2127 variant.into(),
2128 object_type_to_audit_object_type(privilege_object.object_type),
2129 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2130 role_id: privilege_object.role_id.to_string(),
2131 database_id: privilege_object.database_id.map(|id| id.to_string()),
2132 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2133 grantee_id: privilege_acl_item.grantee.to_string(),
2134 privileges: privilege_acl_item.acl_mode.to_string(),
2135 }),
2136 )?;
2137 }
2138 Op::RenameCluster {
2139 id,
2140 name,
2141 to_name,
2142 check_reserved_names,
2143 } => {
2144 if id.is_system() {
2145 return Err(AdapterError::Catalog(Error::new(
2146 ErrorKind::ReadOnlyCluster(name.clone()),
2147 )));
2148 }
2149 if check_reserved_names && is_reserved_name(&to_name) {
2150 return Err(AdapterError::Catalog(Error::new(
2151 ErrorKind::ReservedClusterName(to_name),
2152 )));
2153 }
2154 tx.rename_cluster(id, &name, &to_name)?;
2155 CatalogState::add_to_audit_log(
2156 &state.system_configuration,
2157 oracle_write_ts,
2158 session,
2159 tx,
2160 audit_events,
2161 EventType::Alter,
2162 ObjectType::Cluster,
2163 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2164 id: id.to_string(),
2165 old_name: name.clone(),
2166 new_name: to_name.clone(),
2167 }),
2168 )?;
2169 info!("rename cluster {name} to {to_name}");
2170 }
2171 Op::RenameClusterReplica {
2172 cluster_id,
2173 replica_id,
2174 name,
2175 to_name,
2176 } => {
2177 if is_reserved_name(&to_name) {
2178 return Err(AdapterError::Catalog(Error::new(
2179 ErrorKind::ReservedReplicaName(to_name),
2180 )));
2181 }
2182 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2183 CatalogState::add_to_audit_log(
2184 &state.system_configuration,
2185 oracle_write_ts,
2186 session,
2187 tx,
2188 audit_events,
2189 EventType::Alter,
2190 ObjectType::ClusterReplica,
2191 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2192 cluster_id: cluster_id.to_string(),
2193 replica_id: replica_id.to_string(),
2194 old_name: name.replica.as_str().to_string(),
2195 new_name: to_name.clone(),
2196 }),
2197 )?;
2198 info!("rename cluster replica {name} to {to_name}");
2199 }
2200 Op::RenameItem {
2201 id,
2202 to_name,
2203 current_full_name,
2204 } => {
2205 let mut updates = Vec::new();
2206
2207 let entry = state.get_entry(&id);
2208 if let CatalogItem::Type(_) = entry.item() {
2209 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2210 current_full_name.to_string(),
2211 ))));
2212 }
2213
2214 if entry.id().is_system() {
2215 let name = state
2216 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2217 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2218 name.to_string(),
2219 ))));
2220 }
2221
2222 let mut to_full_name = current_full_name.clone();
2223 to_full_name.item.clone_from(&to_name);
2224
2225 let mut to_qualified_name = entry.name().clone();
2226 to_qualified_name.item.clone_from(&to_name);
2227
2228 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2229 id: id.to_string(),
2230 old_name: Self::full_name_detail(¤t_full_name),
2231 new_name: Self::full_name_detail(&to_full_name),
2232 });
2233 if Self::should_audit_log_item(entry.item()) {
2234 CatalogState::add_to_audit_log(
2235 &state.system_configuration,
2236 oracle_write_ts,
2237 session,
2238 tx,
2239 audit_events,
2240 EventType::Alter,
2241 catalog_type_to_audit_object_type(entry.item().typ()),
2242 details,
2243 )?;
2244 }
2245
2246 let mut new_entry = entry.clone();
2248 new_entry.name.item.clone_from(&to_name);
2249 new_entry.item = entry
2250 .item()
2251 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2252 .map_err(|e| {
2253 Error::new(ErrorKind::from(AmbiguousRename {
2254 depender: state
2255 .resolve_full_name(entry.name(), entry.conn_id())
2256 .to_string(),
2257 dependee: state
2258 .resolve_full_name(entry.name(), entry.conn_id())
2259 .to_string(),
2260 message: e,
2261 }))
2262 })?;
2263
2264 for id in entry.referenced_by() {
2265 let dependent_item = state.get_entry(id);
2266 let mut to_entry = dependent_item.clone();
2267 to_entry.item = dependent_item
2268 .item()
2269 .rename_item_refs(
2270 current_full_name.clone(),
2271 to_full_name.item.clone(),
2272 false,
2273 )
2274 .map_err(|e| {
2275 Error::new(ErrorKind::from(AmbiguousRename {
2276 depender: state
2277 .resolve_full_name(
2278 dependent_item.name(),
2279 dependent_item.conn_id(),
2280 )
2281 .to_string(),
2282 dependee: state
2283 .resolve_full_name(entry.name(), entry.conn_id())
2284 .to_string(),
2285 message: e,
2286 }))
2287 })?;
2288
2289 if !to_entry.item().is_temporary() {
2290 tx.update_item(*id, to_entry.into())?;
2291 } else {
2292 temporary_item_updates
2293 .push((dependent_item.clone().into(), StateDiff::Retraction));
2294 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2295 }
2296 updates.push(*id);
2297 }
2298 if !new_entry.item().is_temporary() {
2299 tx.update_item(id, new_entry.into())?;
2300 } else {
2301 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2302 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2303 }
2304
2305 updates.push(id);
2306 for id in updates {
2307 Self::log_update(state, &id);
2308 }
2309 }
2310 Op::RenameSchema {
2311 database_spec,
2312 schema_spec,
2313 new_name,
2314 check_reserved_names,
2315 } => {
2316 if check_reserved_names && is_reserved_name(&new_name) {
2317 return Err(AdapterError::Catalog(Error::new(
2318 ErrorKind::ReservedSchemaName(new_name),
2319 )));
2320 }
2321
2322 let conn_id = session
2323 .map(|session| session.conn_id())
2324 .unwrap_or(&SYSTEM_CONN_ID);
2325
2326 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2327 let cur_name = schema.name().schema.clone();
2328
2329 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2330 return Err(AdapterError::Catalog(Error::new(
2331 ErrorKind::AmbientSchemaRename(cur_name),
2332 )));
2333 };
2334 let database = state.get_database(&database_id);
2335 let database_name = &database.name;
2336
2337 let mut updates = Vec::new();
2338 let mut items_to_update = BTreeMap::new();
2339
2340 let mut update_item = |id| {
2341 if items_to_update.contains_key(id) {
2342 return Ok(());
2343 }
2344
2345 let entry = state.get_entry(id);
2346
2347 let mut new_entry = entry.clone();
2349 new_entry.item = entry
2350 .item
2351 .rename_schema_refs(database_name, &cur_name, &new_name)
2352 .map_err(|(s, _i)| {
2353 Error::new(ErrorKind::from(AmbiguousRename {
2354 depender: state
2355 .resolve_full_name(entry.name(), entry.conn_id())
2356 .to_string(),
2357 dependee: format!("{database_name}.{cur_name}"),
2358 message: format!("ambiguous reference to schema named {s}"),
2359 }))
2360 })?;
2361
2362 if !new_entry.item().is_temporary() {
2364 items_to_update.insert(*id, new_entry.into());
2365 } else {
2366 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2367 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2368 }
2369 updates.push(id);
2370
2371 Ok::<_, AdapterError>(())
2372 };
2373
2374 for (_name, item_id) in &schema.items {
2376 update_item(item_id)?;
2378
2379 for id in state.get_entry(item_id).referenced_by() {
2381 update_item(id)?;
2382 }
2383 }
2384 tx.update_items(items_to_update)?;
2387
2388 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2390 let schema_name = schema.name().schema.clone();
2391 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2392 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2393 )));
2394 };
2395
2396 let database_name = database_spec
2398 .id()
2399 .map(|id| state.get_database(&id).name.clone());
2400 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2401 id: schema_id.to_string(),
2402 old_name: schema.name().schema.clone(),
2403 new_name: new_name.clone(),
2404 database_name,
2405 });
2406 CatalogState::add_to_audit_log(
2407 &state.system_configuration,
2408 oracle_write_ts,
2409 session,
2410 tx,
2411 audit_events,
2412 EventType::Alter,
2413 mz_audit_log::ObjectType::Schema,
2414 details,
2415 )?;
2416
2417 let mut new_schema = schema.clone();
2419 new_schema.name.schema.clone_from(&new_name);
2420 tx.update_schema(schema_id, new_schema.into())?;
2421
2422 for id in updates {
2423 Self::log_update(state, id);
2424 }
2425 }
2426 Op::UpdateOwner { id, new_owner } => {
2427 let conn_id = session
2428 .map(|session| session.conn_id())
2429 .unwrap_or(&SYSTEM_CONN_ID);
2430 let old_owner = state
2431 .get_owner_id(&id, conn_id)
2432 .expect("cannot update the owner of an object without an owner");
2433 match &id {
2434 ObjectId::Cluster(id) => {
2435 let mut cluster = state.get_cluster(*id).clone();
2436 if id.is_system() {
2437 return Err(AdapterError::Catalog(Error::new(
2438 ErrorKind::ReadOnlyCluster(cluster.name),
2439 )));
2440 }
2441 Self::update_privilege_owners(
2442 &mut cluster.privileges,
2443 cluster.owner_id,
2444 new_owner,
2445 );
2446 cluster.owner_id = new_owner;
2447 tx.update_cluster(*id, cluster.into())?;
2448 }
2449 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2450 let cluster = state.get_cluster(*cluster_id);
2451 let mut replica = cluster
2452 .replica(*replica_id)
2453 .expect("catalog out of sync")
2454 .clone();
2455 if replica_id.is_system() {
2456 return Err(AdapterError::Catalog(Error::new(
2457 ErrorKind::ReadOnlyClusterReplica(replica.name),
2458 )));
2459 }
2460 replica.owner_id = new_owner;
2461 tx.update_cluster_replica(*replica_id, replica.into())?;
2462 }
2463 ObjectId::Database(id) => {
2464 let mut database = state.get_database(id).clone();
2465 if id.is_system() {
2466 return Err(AdapterError::Catalog(Error::new(
2467 ErrorKind::ReadOnlyDatabase(database.name),
2468 )));
2469 }
2470 Self::update_privilege_owners(
2471 &mut database.privileges,
2472 database.owner_id,
2473 new_owner,
2474 );
2475 database.owner_id = new_owner;
2476 tx.update_database(*id, database.clone().into())?;
2477 }
2478 ObjectId::Schema((database_spec, schema_spec)) => {
2479 let schema_id: SchemaId = schema_spec.clone().into();
2480 let mut schema = state
2481 .get_schema(database_spec, schema_spec, conn_id)
2482 .clone();
2483 if schema_id.is_system() {
2484 let name = schema.name();
2485 let full_name = state.resolve_full_schema_name(name);
2486 return Err(AdapterError::Catalog(Error::new(
2487 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2488 )));
2489 }
2490 Self::update_privilege_owners(
2491 &mut schema.privileges,
2492 schema.owner_id,
2493 new_owner,
2494 );
2495 schema.owner_id = new_owner;
2496 tx.update_schema(schema_id, schema.into())?;
2497 }
2498 ObjectId::Item(id) => {
2499 let entry = state.get_entry(id);
2500 let mut new_entry = entry.clone();
2501 if id.is_system() {
2502 let full_name = state.resolve_full_name(
2503 new_entry.name(),
2504 session.map(|session| session.conn_id()),
2505 );
2506 return Err(AdapterError::Catalog(Error::new(
2507 ErrorKind::ReadOnlyItem(full_name.to_string()),
2508 )));
2509 }
2510 Self::update_privilege_owners(
2511 &mut new_entry.privileges,
2512 new_entry.owner_id,
2513 new_owner,
2514 );
2515 new_entry.owner_id = new_owner;
2516 if !new_entry.item().is_temporary() {
2517 tx.update_item(*id, new_entry.into())?;
2518 } else {
2519 temporary_item_updates
2520 .push((entry.clone().into(), StateDiff::Retraction));
2521 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2522 }
2523 }
2524 ObjectId::NetworkPolicy(id) => {
2525 let mut policy = state.get_network_policy(id).clone();
2526 if id.is_system() {
2527 return Err(AdapterError::Catalog(Error::new(
2528 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2529 )));
2530 }
2531 Self::update_privilege_owners(
2532 &mut policy.privileges,
2533 policy.owner_id,
2534 new_owner,
2535 );
2536 policy.owner_id = new_owner;
2537 tx.update_network_policy(*id, policy.into())?;
2538 }
2539 ObjectId::Role(_) => unreachable!("roles have no owner"),
2540 }
2541 let object_type = state.get_object_type(&id);
2542 CatalogState::add_to_audit_log(
2543 &state.system_configuration,
2544 oracle_write_ts,
2545 session,
2546 tx,
2547 audit_events,
2548 EventType::Alter,
2549 object_type_to_audit_object_type(object_type),
2550 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2551 object_id: id.to_string(),
2552 old_owner_id: old_owner.to_string(),
2553 new_owner_id: new_owner.to_string(),
2554 }),
2555 )?;
2556 }
2557 Op::UpdateClusterConfig { id, name, config } => {
2558 let mut cluster = state.get_cluster(id).clone();
2559 cluster.config = config;
2560 tx.update_cluster(id, cluster.into())?;
2561 info!("update cluster {}", name);
2562
2563 CatalogState::add_to_audit_log(
2564 &state.system_configuration,
2565 oracle_write_ts,
2566 session,
2567 tx,
2568 audit_events,
2569 EventType::Alter,
2570 ObjectType::Cluster,
2571 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2572 id: id.to_string(),
2573 name,
2574 }),
2575 )?;
2576 }
2577 Op::UpdateClusterReplicaConfig {
2578 replica_id,
2579 cluster_id,
2580 config,
2581 } => {
2582 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2583 info!("update replica {}", replica.name);
2584 tx.update_cluster_replica(
2585 replica_id,
2586 mz_catalog::durable::ClusterReplica {
2587 cluster_id,
2588 replica_id,
2589 name: replica.name.clone(),
2590 config: config.clone().into(),
2591 owner_id: replica.owner_id,
2592 },
2593 )?;
2594 }
2595 Op::UpdateItem { id, name, to_item } => {
2596 let mut entry = state.get_entry(&id).clone();
2597 entry.name = name.clone();
2598 entry.item = to_item.clone();
2599 tx.update_item(id, entry.into())?;
2600
2601 if Self::should_audit_log_item(&to_item) {
2602 let mut full_name = Self::full_name_detail(
2603 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2604 );
2605 full_name.item = name.item;
2606
2607 CatalogState::add_to_audit_log(
2608 &state.system_configuration,
2609 oracle_write_ts,
2610 session,
2611 tx,
2612 audit_events,
2613 EventType::Alter,
2614 catalog_type_to_audit_object_type(to_item.typ()),
2615 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2616 id: id.to_string(),
2617 name: full_name,
2618 }),
2619 )?;
2620 }
2621
2622 Self::log_update(state, &id);
2623 }
2624 Op::UpdateSystemConfiguration { name, value } => {
2625 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2626 tx.upsert_system_config(&name, parsed_value.clone())?;
2627 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2632 let with_0dt_deployment_max_wait =
2633 Duration::parse(VarInput::Flat(&parsed_value))
2634 .expect("parsing succeeded above");
2635 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2636 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2637 let with_0dt_deployment_ddl_check_interval =
2638 Duration::parse(VarInput::Flat(&parsed_value))
2639 .expect("parsing succeeded above");
2640 tx.set_0dt_deployment_ddl_check_interval(
2641 with_0dt_deployment_ddl_check_interval,
2642 )?;
2643 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2644 let panic_after_timeout =
2645 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2646 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2647 }
2648
2649 CatalogState::add_to_audit_log(
2650 &state.system_configuration,
2651 oracle_write_ts,
2652 session,
2653 tx,
2654 audit_events,
2655 EventType::Alter,
2656 ObjectType::System,
2657 EventDetails::SetV1(mz_audit_log::SetV1 {
2658 name,
2659 value: Some(value.borrow().to_vec().join(", ")),
2660 }),
2661 )?;
2662 }
2663 Op::ResetSystemConfiguration { name } => {
2664 tx.remove_system_config(&name);
2665 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2670 tx.reset_0dt_deployment_max_wait()?;
2671 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2672 tx.reset_0dt_deployment_ddl_check_interval()?;
2673 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2674 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2675 }
2676
2677 CatalogState::add_to_audit_log(
2678 &state.system_configuration,
2679 oracle_write_ts,
2680 session,
2681 tx,
2682 audit_events,
2683 EventType::Alter,
2684 ObjectType::System,
2685 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2686 )?;
2687 }
2688 Op::ResetAllSystemConfiguration => {
2689 tx.clear_system_configs();
2690 tx.reset_0dt_deployment_max_wait()?;
2691 tx.reset_0dt_deployment_ddl_check_interval()?;
2692 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2693
2694 CatalogState::add_to_audit_log(
2695 &state.system_configuration,
2696 oracle_write_ts,
2697 session,
2698 tx,
2699 audit_events,
2700 EventType::Alter,
2701 ObjectType::System,
2702 EventDetails::ResetAllV1,
2703 )?;
2704 }
2705 Op::UpdateScopedSystemParameters { scoped } => {
2706 let live_clusters: BTreeSet<ClusterId> =
2712 tx.get_clusters().map(|cluster| cluster.id).collect();
2713 let live_replicas: BTreeSet<ReplicaId> = tx
2714 .get_cluster_replicas()
2715 .map(|replica| replica.replica_id)
2716 .collect();
2717
2718 let existing_cluster: BTreeMap<(ClusterId, String), String> = tx
2720 .get_cluster_system_configurations()
2721 .map(|c| ((c.cluster_id, c.name), c.value))
2722 .collect();
2723 let mut desired_cluster: BTreeSet<(ClusterId, String)> = BTreeSet::new();
2724 for (cluster_id, values) in &scoped.cluster {
2725 if !live_clusters.contains(cluster_id) {
2726 continue;
2727 }
2728 for (name, value) in values {
2729 desired_cluster.insert((*cluster_id, name.clone()));
2730 if existing_cluster.get(&(*cluster_id, name.clone())) != Some(value) {
2731 tx.upsert_cluster_system_config(*cluster_id, name, value.clone())?;
2732 }
2733 }
2734 }
2735 for (cluster_id, name) in existing_cluster.into_keys() {
2738 if !desired_cluster.contains(&(cluster_id, name.clone())) {
2739 tx.remove_cluster_system_config(cluster_id, &name);
2740 }
2741 }
2742
2743 let existing_replica: BTreeMap<(ReplicaId, String), String> = tx
2745 .get_replica_system_configurations()
2746 .map(|r| ((r.replica_id, r.name), r.value))
2747 .collect();
2748 let mut desired_replica: BTreeSet<(ReplicaId, String)> = BTreeSet::new();
2749 for (replica_id, values) in &scoped.replica {
2750 if !live_replicas.contains(replica_id) {
2751 continue;
2752 }
2753 for (name, value) in values {
2754 desired_replica.insert((*replica_id, name.clone()));
2755 if existing_replica.get(&(*replica_id, name.clone())) != Some(value) {
2756 tx.upsert_replica_system_config(*replica_id, name, value.clone())?;
2757 }
2758 }
2759 }
2760 for (replica_id, name) in existing_replica.into_keys() {
2761 if !desired_replica.contains(&(replica_id, name.clone())) {
2762 tx.remove_replica_system_config(replica_id, &name);
2763 }
2764 }
2765 }
2766 Op::InjectAuditEvents { events } => {
2767 for event in events {
2768 let id = tx.allocate_audit_log_id()?;
2769 let ev = VersionedEvent::new(
2770 id,
2771 event.event_type,
2772 event.object_type,
2773 event.details,
2774 event.user,
2775 oracle_write_ts.into(),
2776 );
2777 audit_events.push(ev.clone());
2778 tx.insert_audit_log_event(ev);
2779 }
2780 }
2781 };
2782 Ok(temporary_item_updates)
2783 }
2784
2785 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2786 let entry = state.get_entry(id);
2787 info!(
2788 "update {} {} ({})",
2789 entry.item_type(),
2790 state.resolve_full_name(entry.name(), entry.conn_id()),
2791 id
2792 );
2793 }
2794
2795 fn update_privilege_owners(
2799 privileges: &mut PrivilegeMap,
2800 old_owner: RoleId,
2801 new_owner: RoleId,
2802 ) {
2803 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2805
2806 let mut new_present = false;
2807 for privilege in flat_privileges.iter_mut() {
2808 if privilege.grantor == old_owner {
2811 privilege.grantor = new_owner;
2812 } else if privilege.grantor == new_owner {
2813 new_present = true;
2814 }
2815 if privilege.grantee == old_owner {
2817 privilege.grantee = new_owner;
2818 } else if privilege.grantee == new_owner {
2819 new_present = true;
2820 }
2821 }
2822
2823 if new_present {
2827 let privilege_map: BTreeMap<_, Vec<_>> =
2829 flat_privileges
2830 .into_iter()
2831 .fold(BTreeMap::new(), |mut accum, privilege| {
2832 accum
2833 .entry((privilege.grantee, privilege.grantor))
2834 .or_default()
2835 .push(privilege);
2836 accum
2837 });
2838
2839 flat_privileges = privilege_map
2841 .into_iter()
2842 .map(|((grantee, grantor), values)|
2843 values.into_iter().fold(
2845 MzAclItem::empty(grantee, grantor),
2846 |mut accum, mz_aclitem| {
2847 accum.acl_mode =
2848 accum.acl_mode.union(mz_aclitem.acl_mode);
2849 accum
2850 },
2851 ))
2852 .collect();
2853 }
2854
2855 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2856 }
2857}
2858
2859fn tx_replace_item(
2868 tx: &mut Transaction<'_>,
2869 state: &CatalogState,
2870 id: CatalogItemId,
2871 new_entry: CatalogEntry,
2872) -> Result<(), AdapterError> {
2873 let new_id = new_entry.id;
2874
2875 for use_id in new_entry.referenced_by() {
2877 if tx.get_item(use_id).is_none() {
2879 continue;
2880 }
2881
2882 let mut dependent = state.get_entry(use_id).clone();
2883 dependent.item = dependent.item.replace_item_refs(id, new_id);
2884 tx.update_item(*use_id, dependent.into())?;
2885 }
2886
2887 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2889 let new_comment_id = new_entry.comment_object_id();
2890 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2891 tx.drop_comments(&[old_comment_id].into())?;
2892 for (sub, comment) in comments {
2893 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2894 }
2895 }
2896
2897 let mz_catalog::durable::Item {
2898 id: _,
2899 oid,
2900 global_id,
2901 schema_id,
2902 name,
2903 create_sql,
2904 owner_id,
2905 privileges,
2906 extra_versions,
2907 } = new_entry.into();
2908
2909 tx.remove_item(id)?;
2910 tx.insert_item(
2911 new_id,
2912 oid,
2913 global_id,
2914 schema_id,
2915 &name,
2916 create_sql,
2917 owner_id,
2918 privileges,
2919 extra_versions,
2920 )?;
2921
2922 Ok(())
2923}
2924
2925fn apply_replacement_audit_events(
2927 state: &CatalogState,
2928 target: &CatalogEntry,
2929 replacement: &CatalogEntry,
2930) -> Vec<(EventType, EventDetails)> {
2931 let mut events = Vec::new();
2932
2933 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2934 let target_id_name = IdFullNameV1 {
2935 id: target.id().to_string(),
2936 name: Catalog::full_name_detail(&target_name),
2937 };
2938 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2939 let replacement_id_name = IdFullNameV1 {
2940 id: replacement.id().to_string(),
2941 name: Catalog::full_name_detail(&replacement_name),
2942 };
2943
2944 if Catalog::should_audit_log_item(&replacement.item) {
2945 events.push((
2946 EventType::Drop,
2947 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2948 ));
2949 }
2950
2951 if Catalog::should_audit_log_item(&target.item) {
2952 events.push((
2953 EventType::Alter,
2954 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2955 target: target_id_name.clone(),
2956 replacement: replacement_id_name,
2957 }),
2958 ));
2959
2960 if let Some(old_cluster_id) = target.cluster_id()
2961 && let Some(new_cluster_id) = replacement.cluster_id()
2962 && old_cluster_id != new_cluster_id
2963 {
2964 events.push((
2967 EventType::Alter,
2968 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2969 id: replacement.id().to_string(),
2970 name: target_id_name.name,
2971 old_cluster_id: old_cluster_id.to_string(),
2972 new_cluster_id: new_cluster_id.to_string(),
2973 }),
2974 ));
2975 }
2976 }
2977
2978 events
2979}
2980
2981#[derive(Debug, Default)]
2989pub(crate) struct ObjectsToDrop {
2990 pub comments: BTreeSet<CommentObjectId>,
2991 pub databases: BTreeSet<DatabaseId>,
2992 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2993 pub clusters: BTreeSet<ClusterId>,
2994 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2995 pub roles: BTreeSet<RoleId>,
2996 pub items: Vec<CatalogItemId>,
2997 pub network_policies: BTreeSet<NetworkPolicyId>,
2998}
2999
3000impl ObjectsToDrop {
3001 pub fn generate(
3002 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
3003 state: &CatalogState,
3004 session: Option<&ConnMeta>,
3005 ) -> Result<Self, AdapterError> {
3006 let mut delta = ObjectsToDrop::default();
3007
3008 for drop_object_info in drop_object_infos {
3009 delta.add_item(drop_object_info, state, session)?;
3010 }
3011
3012 Ok(delta)
3013 }
3014
3015 fn add_item(
3016 &mut self,
3017 drop_object_info: DropObjectInfo,
3018 state: &CatalogState,
3019 session: Option<&ConnMeta>,
3020 ) -> Result<(), AdapterError> {
3021 self.comments
3022 .insert(state.get_comment_id(drop_object_info.to_object_id()));
3023
3024 match drop_object_info {
3025 DropObjectInfo::Database(database_id) => {
3026 let database = &state.database_by_id[&database_id];
3027 if database_id.is_system() {
3028 return Err(AdapterError::Catalog(Error::new(
3029 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
3030 )));
3031 }
3032
3033 self.databases.insert(database_id);
3034 }
3035 DropObjectInfo::Schema((database_spec, schema_spec)) => {
3036 let schema = state.get_schema(
3037 &database_spec,
3038 &schema_spec,
3039 session
3040 .map(|session| session.conn_id())
3041 .unwrap_or(&SYSTEM_CONN_ID),
3042 );
3043 let schema_id: SchemaId = schema_spec.into();
3044 if schema_id.is_system() {
3045 let name = schema.name();
3046 let full_name = state.resolve_full_schema_name(name);
3047 return Err(AdapterError::Catalog(Error::new(
3048 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
3049 )));
3050 }
3051
3052 self.schemas.insert(schema_spec, database_spec);
3053 }
3054 DropObjectInfo::Role(role_id) => {
3055 let name = state.get_role(&role_id).name().to_string();
3056 if role_id.is_system() || role_id.is_predefined() {
3057 return Err(AdapterError::Catalog(Error::new(
3058 ErrorKind::ReservedRoleName(name.clone()),
3059 )));
3060 }
3061 state.ensure_not_reserved_role(&role_id)?;
3062
3063 self.roles.insert(role_id);
3064 }
3065 DropObjectInfo::Cluster(cluster_id) => {
3066 let cluster = state.get_cluster(cluster_id);
3067 let name = &cluster.name;
3068 if cluster_id.is_system() {
3069 return Err(AdapterError::Catalog(Error::new(
3070 ErrorKind::ReadOnlyCluster(name.clone()),
3071 )));
3072 }
3073
3074 self.clusters.insert(cluster_id);
3075 }
3076 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3077 let cluster = state.get_cluster(cluster_id);
3078 let replica = cluster.replica(replica_id).expect("Must exist");
3079
3080 self.replicas
3081 .insert(replica.replica_id, (cluster.id, reason));
3082
3083 let mut seen: BTreeSet<ObjectId> =
3102 self.items.iter().copied().map(ObjectId::Item).collect();
3103 for dep in state.cluster_replica_dependents(cluster_id, replica_id, &mut seen) {
3104 if let ObjectId::Item(dep_id) = dep {
3105 info!(
3106 "implicitly dropping {} because target replica was dropped",
3107 state.get_entry(&dep_id).name().item,
3108 );
3109 self.comments
3110 .insert(state.get_comment_id(ObjectId::Item(dep_id)));
3111 self.items.push(dep_id);
3112 }
3113 }
3114 }
3115 DropObjectInfo::Item(item_id) => {
3116 let entry = state.get_entry(&item_id);
3117 if item_id.is_system() {
3118 let name = entry.name();
3119 let full_name =
3120 state.resolve_full_name(name, session.map(|session| session.conn_id()));
3121 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3122 full_name.to_string(),
3123 ))));
3124 }
3125
3126 self.items.push(item_id);
3127 }
3128 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3129 let policy = state.get_network_policy(&network_policy_id);
3130 let name = &policy.name;
3131 if network_policy_id.is_system() {
3132 return Err(AdapterError::Catalog(Error::new(
3133 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3134 )));
3135 }
3136
3137 self.network_policies.insert(network_policy_id);
3138 }
3139 }
3140
3141 Ok(())
3142 }
3143}
3144
3145#[cfg(test)]
3146mod tests {
3147 use mz_catalog::SYSTEM_CONN_ID;
3148 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3149 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3150 use mz_repr::role_id::RoleId;
3151 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3152 use mz_sql::DEFAULT_SCHEMA;
3153 use mz_sql::catalog::CatalogDatabase;
3154 use mz_sql::names::{
3155 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3156 };
3157 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3158
3159 use crate::catalog::{Catalog, Op};
3160 use crate::session::DEFAULT_DATABASE_NAME;
3161
3162 #[mz_ore::test]
3163 fn test_update_privilege_owners() {
3164 let old_owner = RoleId::User(1);
3165 let new_owner = RoleId::User(2);
3166 let other_role = RoleId::User(3);
3167
3168 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3170 MzAclItem {
3171 grantee: other_role,
3172 grantor: old_owner,
3173 acl_mode: AclMode::UPDATE,
3174 },
3175 MzAclItem {
3176 grantee: other_role,
3177 grantor: new_owner,
3178 acl_mode: AclMode::SELECT,
3179 },
3180 ]);
3181 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3182 assert_eq!(1, privileges.all_values().count());
3183 assert_eq!(
3184 vec![MzAclItem {
3185 grantee: other_role,
3186 grantor: new_owner,
3187 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3188 }],
3189 privileges.all_values_owned().collect::<Vec<_>>()
3190 );
3191
3192 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3194 MzAclItem {
3195 grantee: old_owner,
3196 grantor: other_role,
3197 acl_mode: AclMode::UPDATE,
3198 },
3199 MzAclItem {
3200 grantee: new_owner,
3201 grantor: other_role,
3202 acl_mode: AclMode::SELECT,
3203 },
3204 ]);
3205 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3206 assert_eq!(1, privileges.all_values().count());
3207 assert_eq!(
3208 vec![MzAclItem {
3209 grantee: new_owner,
3210 grantor: other_role,
3211 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3212 }],
3213 privileges.all_values_owned().collect::<Vec<_>>()
3214 );
3215
3216 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3218 MzAclItem {
3219 grantee: old_owner,
3220 grantor: old_owner,
3221 acl_mode: AclMode::UPDATE,
3222 },
3223 MzAclItem {
3224 grantee: new_owner,
3225 grantor: new_owner,
3226 acl_mode: AclMode::SELECT,
3227 },
3228 ]);
3229 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3230 assert_eq!(1, privileges.all_values().count());
3231 assert_eq!(
3232 vec![MzAclItem {
3233 grantee: new_owner,
3234 grantor: new_owner,
3235 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3236 }],
3237 privileges.all_values_owned().collect::<Vec<_>>()
3238 );
3239 }
3240
3241 #[mz_ore::test(tokio::test)]
3248 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3250 Catalog::with_debug(|catalog| async move {
3251 let database = catalog
3253 .resolve_database(DEFAULT_DATABASE_NAME)
3254 .expect("default database");
3255 let database_name = database.name.clone();
3256 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3257 let schema = catalog
3258 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3259 .expect("default schema");
3260 let schema_name = schema.name.schema.clone();
3261 let schema_spec = schema.id.clone();
3262
3263 let (id_t1, global_id_t1) = catalog
3265 .allocate_user_id_for_test()
3266 .await
3267 .expect("allocate id for t1");
3268 let (id_t2, global_id_t2) = catalog
3269 .allocate_user_id_for_test()
3270 .await
3271 .expect("allocate id for t2");
3272
3273 let oracle_write_ts = catalog.current_upper().await;
3274
3275 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3277 id,
3278 name: QualifiedItemName {
3279 qualifiers: ItemQualifiers {
3280 database_spec: database_spec.clone(),
3281 schema_spec: schema_spec.clone(),
3282 },
3283 item: name.to_string(),
3284 },
3285 item: CatalogItem::Table(Table {
3286 create_sql: Some(format!(
3287 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3288 )),
3289 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3290 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3291 conn_id: None,
3292 resolved_ids: ResolvedIds::empty(),
3293 custom_logical_compaction_window: None,
3294 is_retained_metrics_object: false,
3295 data_source: TableDataSource::TableWrites { defaults: vec![] },
3296 }),
3297 owner_id: MZ_SYSTEM_ROLE_ID,
3298 };
3299
3300 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3301 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3302
3303 let base_state = catalog.state().clone();
3304
3305 let (state_after_t1, snapshot_after_t1) = catalog
3309 .transact_incremental_dry_run(
3310 &base_state,
3311 vec![op_t1.clone()],
3312 None,
3313 None,
3314 oracle_write_ts,
3315 )
3316 .await
3317 .expect("first dry run");
3318
3319 assert!(
3321 state_after_t1.try_get_entry(&id_t1).is_some(),
3322 "t1 should exist after first dry run"
3323 );
3324 assert_eq!(
3325 state_after_t1
3326 .try_get_entry(&id_t1)
3327 .expect("t1 entry")
3328 .name()
3329 .item,
3330 "t1"
3331 );
3332 assert!(
3333 state_after_t1.try_get_entry(&id_t2).is_none(),
3334 "t2 should NOT exist after first dry run"
3335 );
3336
3337 let (state_incremental, _) = catalog
3339 .transact_incremental_dry_run(
3340 &state_after_t1,
3341 vec![op_t2.clone()],
3342 None,
3343 Some(snapshot_after_t1),
3344 oracle_write_ts,
3345 )
3346 .await
3347 .expect("second dry run");
3348
3349 assert!(
3351 state_incremental.try_get_entry(&id_t1).is_some(),
3352 "t1 should exist in incremental result"
3353 );
3354 assert!(
3355 state_incremental.try_get_entry(&id_t2).is_some(),
3356 "t2 should exist in incremental result"
3357 );
3358
3359 let (state_all_at_once, _) = catalog
3362 .transact_incremental_dry_run(
3363 &base_state,
3364 vec![op_t1.clone(), op_t2.clone()],
3365 None,
3366 None,
3367 oracle_write_ts,
3368 )
3369 .await
3370 .expect("all-at-once dry run");
3371
3372 assert!(
3373 state_all_at_once.try_get_entry(&id_t1).is_some(),
3374 "t1 should exist in all-at-once result"
3375 );
3376 assert!(
3377 state_all_at_once.try_get_entry(&id_t2).is_some(),
3378 "t2 should exist in all-at-once result"
3379 );
3380
3381 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3384 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3385 assert_eq!(inc_t1.name(), all_t1.name());
3386 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3387
3388 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3389 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3390 assert_eq!(inc_t2.name(), all_t2.name());
3391 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3392
3393 catalog.expire().await;
3394 })
3395 .await
3396 }
3397}