1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
51 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
52 DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
53 RoleVars,
54};
55use mz_sql::names::{
56 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
57 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{NetworkPolicyRule, PlanError};
60use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
61use mz_sql::session::vars::OwnedVarInput;
62use mz_sql::session::vars::{Value as VarValue, VarInput};
63use mz_sql::{DEFAULT_SCHEMA, rbac};
64use mz_sql_parser::ast::{QualifiedReplica, Value};
65use mz_storage_client::storage_collections::StorageCollections;
66use serde::{Deserialize, Serialize};
67use tracing::{info, trace};
68
69use crate::AdapterError;
70use crate::catalog::state::LocalExpressionCache;
71use crate::catalog::{
72 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
73 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
74 is_reserved_role_name, object_type_to_audit_object_type,
75 system_object_type_to_audit_object_type,
76};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88 pub event_type: EventType,
89 pub object_type: ObjectType,
90 pub details: EventDetails,
91 pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96 AlterRetainHistory {
97 id: CatalogItemId,
98 value: Option<Value>,
99 window: CompactionWindow,
100 },
101 AlterSourceTimestampInterval {
102 id: CatalogItemId,
103 value: Option<Value>,
104 interval: Duration,
105 },
106 AlterRole {
107 id: RoleId,
108 name: String,
109 attributes: RoleAttributesRaw,
110 nopassword: bool,
111 vars: RoleVars,
112 },
113 AlterNetworkPolicy {
114 id: NetworkPolicyId,
115 rules: Vec<NetworkPolicyRule>,
116 name: String,
117 owner_id: RoleId,
118 },
119 AlterAddColumn {
120 id: CatalogItemId,
121 new_global_id: GlobalId,
122 name: ColumnName,
123 typ: SqlColumnType,
124 sql: RawDataType,
125 },
126 AlterMaterializedViewApplyReplacement {
127 id: CatalogItemId,
128 replacement_id: CatalogItemId,
129 },
130 CreateDatabase {
131 name: String,
132 owner_id: RoleId,
133 },
134 CreateSchema {
135 database_id: ResolvedDatabaseSpecifier,
136 schema_name: String,
137 owner_id: RoleId,
138 },
139 CreateRole {
140 name: String,
141 attributes: RoleAttributesRaw,
142 },
143 CreateCluster {
144 id: ClusterId,
145 name: String,
146 introspection_sources: Vec<&'static BuiltinLog>,
147 owner_id: RoleId,
148 config: ClusterConfig,
149 },
150 CreateClusterReplica {
151 cluster_id: ClusterId,
152 name: String,
153 config: ReplicaConfig,
154 owner_id: RoleId,
155 reason: ReplicaCreateDropReason,
156 },
157 CreateItem {
158 id: CatalogItemId,
159 name: QualifiedItemName,
160 item: CatalogItem,
161 owner_id: RoleId,
162 },
163 CreateNetworkPolicy {
164 rules: Vec<NetworkPolicyRule>,
165 name: String,
166 owner_id: RoleId,
167 },
168 Comment {
169 object_id: CommentObjectId,
170 sub_component: Option<usize>,
171 comment: Option<String>,
172 },
173 DropObjects(Vec<DropObjectInfo>),
174 GrantRole {
175 role_id: RoleId,
176 member_id: RoleId,
177 grantor_id: RoleId,
178 },
179 RenameCluster {
180 id: ClusterId,
181 name: String,
182 to_name: String,
183 check_reserved_names: bool,
184 },
185 RenameClusterReplica {
186 cluster_id: ClusterId,
187 replica_id: ReplicaId,
188 name: QualifiedReplica,
189 to_name: String,
190 },
191 RenameItem {
192 id: CatalogItemId,
193 current_full_name: FullItemName,
194 to_name: String,
195 },
196 RenameSchema {
197 database_spec: ResolvedDatabaseSpecifier,
198 schema_spec: SchemaSpecifier,
199 new_name: String,
200 check_reserved_names: bool,
201 },
202 UpdateOwner {
203 id: ObjectId,
204 new_owner: RoleId,
205 },
206 UpdatePrivilege {
207 target_id: SystemObjectId,
208 privilege: MzAclItem,
209 variant: UpdatePrivilegeVariant,
210 },
211 UpdateDefaultPrivilege {
212 privilege_object: DefaultPrivilegeObject,
213 privilege_acl_item: DefaultPrivilegeAclItem,
214 variant: UpdatePrivilegeVariant,
215 },
216 RevokeRole {
217 role_id: RoleId,
218 member_id: RoleId,
219 grantor_id: RoleId,
220 },
221 UpdateClusterConfig {
222 id: ClusterId,
223 name: String,
224 config: ClusterConfig,
225 },
226 UpdateClusterReplicaConfig {
227 cluster_id: ClusterId,
228 replica_id: ReplicaId,
229 config: ReplicaConfig,
230 },
231 UpdateItem {
232 id: CatalogItemId,
233 name: QualifiedItemName,
234 to_item: CatalogItem,
235 },
236 UpdateSourceReferences {
237 source_id: CatalogItemId,
238 references: SourceReferences,
239 },
240 UpdateSystemConfiguration {
241 name: String,
242 value: OwnedVarInput,
243 },
244 ResetSystemConfiguration {
245 name: String,
246 },
247 ResetAllSystemConfiguration,
248 WeirdStorageUsageUpdates {
255 object_id: Option<String>,
256 size_bytes: u64,
257 collection_timestamp: EpochMillis,
258 },
259 InjectAuditEvents {
265 events: Vec<InjectedAuditEvent>,
266 },
267}
268
269#[derive(Debug, Clone)]
273pub enum DropObjectInfo {
274 Cluster(ClusterId),
275 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
276 Database(DatabaseId),
277 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
278 Role(RoleId),
279 Item(CatalogItemId),
280 NetworkPolicy(NetworkPolicyId),
281}
282
283impl DropObjectInfo {
284 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
287 match id {
288 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
289 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
290 cluster_id,
291 replica_id,
292 ReplicaCreateDropReason::Manual,
293 )),
294 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
295 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
296 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
297 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
298 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
299 }
300 }
301
302 fn to_object_id(&self) -> ObjectId {
305 match &self {
306 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
307 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
308 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
309 }
310 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
311 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
312 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
313 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
314 DropObjectInfo::NetworkPolicy(network_policy_id) => {
315 ObjectId::NetworkPolicy(network_policy_id.clone())
316 }
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub enum ReplicaCreateDropReason {
324 Manual,
329 ClusterScheduling(Vec<SchedulingDecision>),
332}
333
334impl ReplicaCreateDropReason {
335 pub fn into_audit_log(
336 self,
337 ) -> (
338 CreateOrDropClusterReplicaReasonV1,
339 Option<SchedulingDecisionsWithReasonsV2>,
340 ) {
341 let (reason, scheduling_policies) = match self {
342 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
343 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
344 CreateOrDropClusterReplicaReasonV1::Schedule,
345 Some(scheduling_decisions),
346 ),
347 };
348 (
349 reason,
350 scheduling_policies
351 .as_ref()
352 .map(SchedulingDecision::reasons_to_audit_log_reasons),
353 )
354 }
355}
356
357pub struct TransactionResult {
358 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
359 pub catalog_updates: Vec<ParsedStateUpdate>,
361 pub audit_events: Vec<VersionedEvent>,
362}
363
364#[derive(Debug, Clone, Copy)]
365enum TransactInnerMode {
366 Commit,
370 DryRun,
377}
378
379impl Catalog {
380 fn should_audit_log_item(item: &CatalogItem) -> bool {
381 !item.is_temporary()
382 }
383
384 fn temporary_ids(
387 &self,
388 ops: &[Op],
389 temporary_drops: BTreeSet<(&ConnectionId, String)>,
390 ) -> Result<BTreeSet<CatalogItemId>, Error> {
391 let mut creating = BTreeSet::new();
392 let mut temporary_ids = BTreeSet::new();
393 for op in ops.iter() {
394 if let Op::CreateItem {
395 id,
396 name,
397 item,
398 owner_id: _,
399 } = op
400 {
401 if let Some(conn_id) = item.conn_id() {
402 if self.item_exists_in_temp_schemas(conn_id, &name.item)
403 && !temporary_drops.contains(&(conn_id, name.item.clone()))
404 || creating.contains(&(conn_id, &name.item))
405 {
406 return Err(
407 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
408 );
409 } else {
410 creating.insert((conn_id, &name.item));
411 temporary_ids.insert(id.clone());
412 }
413 }
414 }
415 }
416 Ok(temporary_ids)
417 }
418
419 #[instrument(name = "catalog::transact")]
420 pub async fn transact(
421 &mut self,
422 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
425 oracle_write_ts: mz_repr::Timestamp,
426 session: Option<&ConnMeta>,
427 ops: Vec<Op>,
428 ) -> Result<TransactionResult, AdapterError> {
429 trace!("transact: {:?}", ops);
430 fail::fail_point!("catalog_transact", |arg| {
431 Err(AdapterError::Unstructured(anyhow::anyhow!(
432 "failpoint: {arg:?}"
433 )))
434 });
435
436 let drop_ids: BTreeSet<CatalogItemId> = ops
437 .iter()
438 .filter_map(|op| match op {
439 Op::DropObjects(drop_object_infos) => {
440 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
441 let item_ids = ids.filter_map(|id| match id {
442 ObjectId::Item(id) => Some(id),
443 _ => None,
444 });
445 Some(item_ids)
446 }
447 _ => None,
448 })
449 .flatten()
450 .collect();
451 let temporary_drops = drop_ids
452 .iter()
453 .filter_map(|id| {
454 let entry = self.get_entry(id);
455 match entry.item().conn_id() {
456 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
457 None => None,
458 }
459 })
460 .collect();
461
462 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
463 let mut builtin_table_updates = vec![];
464 let mut catalog_updates = vec![];
465 let mut audit_events = vec![];
466 let mut storage = self.storage().await;
467 let mut tx = storage
468 .transaction()
469 .await
470 .unwrap_or_terminate("starting catalog transaction");
471
472 let new_state = Self::transact_inner(
473 TransactInnerMode::Commit,
474 storage_collections,
475 oracle_write_ts,
476 session,
477 ops,
478 temporary_ids,
479 &mut builtin_table_updates,
480 &mut catalog_updates,
481 &mut audit_events,
482 &mut tx,
483 &self.state,
484 )
485 .await?;
486
487 tx.commit(oracle_write_ts)
492 .await
493 .unwrap_or_terminate("catalog storage transaction commit must succeed");
494
495 drop(storage);
498 if let Some(new_state) = new_state {
499 self.transient_revision += 1;
500 self.state = new_state;
501 }
502
503 Ok(TransactionResult {
504 builtin_table_updates,
505 catalog_updates,
506 audit_events,
507 })
508 }
509
510 pub async fn transact_incremental_dry_run(
525 &self,
526 base_state: &CatalogState,
527 ops: Vec<Op>,
528 session: Option<&ConnMeta>,
529 prev_snapshot: Option<Snapshot>,
530 oracle_write_ts: mz_repr::Timestamp,
531 ) -> Result<(CatalogState, Snapshot), AdapterError> {
532 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
535
536 let mut builtin_table_updates = vec![];
537 let mut catalog_updates = vec![];
538 let mut audit_events = vec![];
539 let mut storage = self.storage().await;
540 let mut tx = if let Some(snapshot) = prev_snapshot {
541 storage
544 .transaction_from_snapshot(snapshot)
545 .unwrap_or_terminate("starting catalog transaction from snapshot")
546 } else {
547 storage
550 .transaction()
551 .await
552 .unwrap_or_terminate("starting catalog transaction")
553 };
554
555 let new_state = Self::transact_inner(
557 TransactInnerMode::DryRun,
558 None,
559 oracle_write_ts,
560 session,
561 ops,
562 temporary_ids,
563 &mut builtin_table_updates,
564 &mut catalog_updates,
565 &mut audit_events,
566 &mut tx,
567 base_state,
568 )
569 .await?;
570
571 let new_snapshot = tx.current_snapshot();
574
575 drop(storage);
577
578 let state = new_state.unwrap_or_else(|| base_state.clone());
580 Ok((state, new_snapshot))
581 }
582
583 fn extract_expressions_from_ops(
587 ops: &[Op],
588 optimizer_features: &OptimizerFeatures,
589 ) -> BTreeMap<GlobalId, LocalExpressions> {
590 let mut exprs = BTreeMap::new();
591
592 for op in ops {
593 if let Op::CreateItem { item, .. } = op {
594 match item {
595 CatalogItem::View(view) => {
596 exprs.insert(
597 view.global_id,
598 LocalExpressions {
599 local_mir: (*view.locally_optimized_expr).clone(),
600 optimizer_features: optimizer_features.clone(),
601 },
602 );
603 }
604 CatalogItem::MaterializedView(mv) => {
605 exprs.insert(
606 mv.global_id_writes(),
607 LocalExpressions {
608 local_mir: (*mv.locally_optimized_expr).clone(),
609 optimizer_features: optimizer_features.clone(),
610 },
611 );
612 }
613 CatalogItem::Table(_)
614 | CatalogItem::Source(_)
615 | CatalogItem::Log(_)
616 | CatalogItem::Sink(_)
617 | CatalogItem::Index(_)
618 | CatalogItem::Type(_)
619 | CatalogItem::Func(_)
620 | CatalogItem::Secret(_)
621 | CatalogItem::Connection(_) => {}
622 }
623 }
624 }
625
626 exprs
627 }
628
629 #[instrument(name = "catalog::transact_inner")]
637 async fn transact_inner(
638 mode: TransactInnerMode,
639 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
640 oracle_write_ts: mz_repr::Timestamp,
641 session: Option<&ConnMeta>,
642 ops: Vec<Op>,
643 temporary_ids: BTreeSet<CatalogItemId>,
644 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
645 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
646 audit_events: &mut Vec<VersionedEvent>,
647 tx: &mut Transaction<'_>,
648 state: &CatalogState,
649 ) -> Result<Option<CatalogState>, AdapterError> {
650 let mut preliminary_state = Cow::Borrowed(state);
677
678 let mut state = Cow::Borrowed(state);
680
681 if ops.is_empty() {
682 return Ok(None);
683 }
684
685 let optimizer_features = OptimizerFeatures::from(state.system_config());
688 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
689
690 let mut storage_collections_to_create = BTreeSet::new();
691 let mut storage_collections_to_drop = BTreeSet::new();
692 let mut storage_collections_to_register = BTreeMap::new();
693
694 let mut updates = Vec::new();
695
696 for op in ops {
697 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
698 oracle_write_ts,
699 session,
700 op,
701 &temporary_ids,
702 audit_events,
703 tx,
704 &*preliminary_state,
705 &mut storage_collections_to_create,
706 &mut storage_collections_to_drop,
707 &mut storage_collections_to_register,
708 )
709 .await?;
710
711 if let Some(builtin_table_update) = weird_builtin_table_update {
720 builtin_table_updates.push(builtin_table_update);
721 }
722
723 let upper = tx.upper();
728 let temporary_item_updates =
729 temporary_item_updates
730 .into_iter()
731 .map(|(item, diff)| StateUpdate {
732 kind: StateUpdateKind::TemporaryItem(item),
733 ts: upper,
734 diff,
735 });
736
737 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
738 op_updates.extend(temporary_item_updates);
739 if !op_updates.is_empty() {
740 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
743 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
744 .to_mut()
745 .apply_updates(op_updates.clone(), &mut local_expr_cache)
746 .await;
747 }
748 updates.append(&mut op_updates);
749 }
750
751 if !updates.is_empty() {
752 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
753 let (op_builtin_table_updates, op_catalog_updates) = state
754 .to_mut()
755 .apply_updates(updates.clone(), &mut local_expr_cache)
756 .await;
757 let op_builtin_table_updates = state
758 .to_mut()
759 .resolve_builtin_table_updates(op_builtin_table_updates);
760 builtin_table_updates.extend(op_builtin_table_updates);
761 parsed_catalog_updates.extend(op_catalog_updates);
762 }
763
764 match mode {
765 TransactInnerMode::Commit => {
766 if let Some(c) = storage_collections {
768 c.prepare_state(
769 tx,
770 storage_collections_to_create,
771 storage_collections_to_drop,
772 storage_collections_to_register,
773 )
774 .await?;
775 }
776 }
777 TransactInnerMode::DryRun => {
778 debug_assert!(
779 storage_collections.is_none(),
780 "dry-run mode must not prepare storage state"
781 );
782 }
783 }
784
785 let updates = tx.get_and_commit_op_updates();
786 if !updates.is_empty() {
787 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
788 let (op_builtin_table_updates, op_catalog_updates) = state
789 .to_mut()
790 .apply_updates(updates.clone(), &mut local_expr_cache)
791 .await;
792 let op_builtin_table_updates = state
793 .to_mut()
794 .resolve_builtin_table_updates(op_builtin_table_updates);
795 builtin_table_updates.extend(op_builtin_table_updates);
796 parsed_catalog_updates.extend(op_catalog_updates);
797 }
798
799 match state {
800 Cow::Owned(state) => Ok(Some(state)),
801 Cow::Borrowed(_) => Ok(None),
802 }
803 }
804
805 #[instrument]
813 async fn transact_op(
814 oracle_write_ts: mz_repr::Timestamp,
815 session: Option<&ConnMeta>,
816 op: Op,
817 temporary_ids: &BTreeSet<CatalogItemId>,
818 audit_events: &mut Vec<VersionedEvent>,
819 tx: &mut Transaction<'_>,
820 state: &CatalogState,
821 storage_collections_to_create: &mut BTreeSet<GlobalId>,
822 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
823 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
824 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
825 let mut weird_builtin_table_update = None;
826 let mut temporary_item_updates = Vec::new();
827
828 match op {
829 Op::AlterRetainHistory { id, value, window } => {
830 let entry = state.get_entry(&id);
831 if id.is_system() {
832 let name = entry.name();
833 let full_name =
834 state.resolve_full_name(name, session.map(|session| session.conn_id()));
835 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
836 full_name.to_string(),
837 ))));
838 }
839
840 let mut new_entry = entry.clone();
841 let previous = new_entry
842 .item
843 .update_retain_history(value.clone(), window)
844 .map_err(|_| {
845 AdapterError::Catalog(Error::new(ErrorKind::Internal(
846 "planner should have rejected invalid alter retain history item type"
847 .to_string(),
848 )))
849 })?;
850
851 if Self::should_audit_log_item(new_entry.item()) {
852 let details =
853 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
854 id: id.to_string(),
855 old_history: previous.map(|previous| previous.to_string()),
856 new_history: value.map(|v| v.to_string()),
857 });
858 CatalogState::add_to_audit_log(
859 &state.system_configuration,
860 oracle_write_ts,
861 session,
862 tx,
863 audit_events,
864 EventType::Alter,
865 catalog_type_to_audit_object_type(new_entry.item().typ()),
866 details,
867 )?;
868 }
869
870 tx.update_item(id, new_entry.into())?;
871
872 Self::log_update(state, &id);
873 }
874 Op::AlterSourceTimestampInterval {
875 id,
876 value,
877 interval,
878 } => {
879 let entry = state.get_entry(&id);
880 if id.is_system() {
881 let name = entry.name();
882 let full_name =
883 state.resolve_full_name(name, session.map(|session| session.conn_id()));
884 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
885 full_name.to_string(),
886 ))));
887 }
888
889 let mut new_entry = entry.clone();
890 new_entry
891 .item
892 .update_timestamp_interval(value, interval)
893 .map_err(|_| {
894 AdapterError::Catalog(Error::new(ErrorKind::Internal(
895 "planner should have rejected invalid alter timestamp interval item type"
896 .to_string(),
897 )))
898 })?;
899
900 tx.update_item(id, new_entry.into())?;
901
902 Self::log_update(state, &id);
903 }
904 Op::AlterRole {
905 id,
906 name,
907 attributes,
908 nopassword,
909 vars,
910 } => {
911 state.ensure_not_reserved_role(&id)?;
912
913 let mut existing_role = state.get_role(&id).clone();
914 let password = attributes.password.clone();
915 let scram_iterations = attributes
916 .scram_iterations
917 .unwrap_or_else(|| state.system_config().scram_iterations());
918 existing_role.attributes = attributes.into();
919 existing_role.vars = vars;
920 let password_action = if nopassword {
921 PasswordAction::Clear
922 } else if let Some(password) = password {
923 PasswordAction::Set(PasswordConfig {
924 password,
925 scram_iterations,
926 })
927 } else {
928 PasswordAction::NoChange
929 };
930 tx.update_role(id, existing_role.into(), password_action)?;
931
932 CatalogState::add_to_audit_log(
933 &state.system_configuration,
934 oracle_write_ts,
935 session,
936 tx,
937 audit_events,
938 EventType::Alter,
939 ObjectType::Role,
940 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
941 id: id.to_string(),
942 name: name.clone(),
943 }),
944 )?;
945
946 info!("update role {name} ({id})");
947 }
948 Op::AlterNetworkPolicy {
949 id,
950 rules,
951 name,
952 owner_id: _owner_id,
953 } => {
954 let existing_policy = state.get_network_policy(&id).clone();
955 let mut policy: NetworkPolicy = existing_policy.into();
956 policy.rules = rules;
957 if is_reserved_name(&name) {
958 return Err(AdapterError::Catalog(Error::new(
959 ErrorKind::ReservedNetworkPolicyName(name),
960 )));
961 }
962 tx.update_network_policy(id, policy.clone())?;
963
964 CatalogState::add_to_audit_log(
965 &state.system_configuration,
966 oracle_write_ts,
967 session,
968 tx,
969 audit_events,
970 EventType::Alter,
971 ObjectType::NetworkPolicy,
972 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
973 id: id.to_string(),
974 name: name.clone(),
975 }),
976 )?;
977
978 info!("update network policy {name} ({id})");
979 }
980 Op::AlterAddColumn {
981 id,
982 new_global_id,
983 name,
984 typ,
985 sql,
986 } => {
987 let mut new_entry = state.get_entry(&id).clone();
988 let version = new_entry.item.add_column(name, typ, sql)?;
989 let shard_id = state
992 .storage_metadata()
993 .get_collection_shard(new_entry.latest_global_id())?;
994
995 let CatalogItem::Table(table) = &mut new_entry.item else {
997 return Err(AdapterError::Unsupported("adding columns to non-Table"));
998 };
999 table.collections.insert(version, new_global_id);
1000
1001 tx.update_item(id, new_entry.into())?;
1002 storage_collections_to_register.insert(new_global_id, shard_id);
1003 }
1004 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1005 let mut new_entry = state.get_entry(&id).clone();
1006 let replacement = state.get_entry(&replacement_id);
1007
1008 let new_audit_events =
1009 apply_replacement_audit_events(state, &new_entry, replacement);
1010
1011 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1012 return Err(AdapterError::internal(
1013 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1014 "id must refer to a materialized view",
1015 ));
1016 };
1017 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1018 return Err(AdapterError::internal(
1019 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1020 "replacement_id must refer to a materialized view",
1021 ));
1022 };
1023
1024 mv.apply_replacement(replacement_mv.clone());
1025
1026 tx.remove_item(replacement_id)?;
1027
1028 new_entry.id = replacement_id;
1029 tx_replace_item(tx, state, id, new_entry)?;
1030
1031 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1032 tx.drop_comments(&[comment_id].into())?;
1033
1034 for (event_type, details) in new_audit_events {
1035 CatalogState::add_to_audit_log(
1036 &state.system_configuration,
1037 oracle_write_ts,
1038 session,
1039 tx,
1040 audit_events,
1041 event_type,
1042 ObjectType::MaterializedView,
1043 details,
1044 )?;
1045 }
1046 }
1047 Op::CreateDatabase { name, owner_id } => {
1048 let database_owner_privileges = vec![rbac::owner_privilege(
1049 mz_sql::catalog::ObjectType::Database,
1050 owner_id,
1051 )];
1052 let database_default_privileges = state
1053 .default_privileges
1054 .get_applicable_privileges(
1055 owner_id,
1056 None,
1057 None,
1058 mz_sql::catalog::ObjectType::Database,
1059 )
1060 .map(|item| item.mz_acl_item(owner_id));
1061 let database_privileges: Vec<_> = merge_mz_acl_items(
1062 database_owner_privileges
1063 .into_iter()
1064 .chain(database_default_privileges),
1065 )
1066 .collect();
1067
1068 let schema_owner_privileges = vec![rbac::owner_privilege(
1069 mz_sql::catalog::ObjectType::Schema,
1070 owner_id,
1071 )];
1072 let schema_default_privileges = state
1073 .default_privileges
1074 .get_applicable_privileges(
1075 owner_id,
1076 None,
1077 None,
1078 mz_sql::catalog::ObjectType::Schema,
1079 )
1080 .map(|item| item.mz_acl_item(owner_id))
1081 .chain(std::iter::once(MzAclItem {
1083 grantee: RoleId::Public,
1084 grantor: owner_id,
1085 acl_mode: AclMode::USAGE,
1086 }));
1087 let schema_privileges: Vec<_> = merge_mz_acl_items(
1088 schema_owner_privileges
1089 .into_iter()
1090 .chain(schema_default_privileges),
1091 )
1092 .collect();
1093
1094 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1095 let (database_id, _) = tx.insert_user_database(
1096 &name,
1097 owner_id,
1098 database_privileges.clone(),
1099 &temporary_oids,
1100 )?;
1101 let (schema_id, _) = tx.insert_user_schema(
1102 database_id,
1103 DEFAULT_SCHEMA,
1104 owner_id,
1105 schema_privileges.clone(),
1106 &temporary_oids,
1107 )?;
1108 CatalogState::add_to_audit_log(
1109 &state.system_configuration,
1110 oracle_write_ts,
1111 session,
1112 tx,
1113 audit_events,
1114 EventType::Create,
1115 ObjectType::Database,
1116 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1117 id: database_id.to_string(),
1118 name: name.clone(),
1119 }),
1120 )?;
1121 info!("create database {}", name);
1122
1123 CatalogState::add_to_audit_log(
1124 &state.system_configuration,
1125 oracle_write_ts,
1126 session,
1127 tx,
1128 audit_events,
1129 EventType::Create,
1130 ObjectType::Schema,
1131 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1132 id: schema_id.to_string(),
1133 name: DEFAULT_SCHEMA.to_string(),
1134 database_name: Some(name),
1135 }),
1136 )?;
1137 }
1138 Op::CreateSchema {
1139 database_id,
1140 schema_name,
1141 owner_id,
1142 } => {
1143 if is_reserved_name(&schema_name) {
1144 return Err(AdapterError::Catalog(Error::new(
1145 ErrorKind::ReservedSchemaName(schema_name),
1146 )));
1147 }
1148 let database_id = match database_id {
1149 ResolvedDatabaseSpecifier::Id(id) => id,
1150 ResolvedDatabaseSpecifier::Ambient => {
1151 return Err(AdapterError::Catalog(Error::new(
1152 ErrorKind::ReadOnlySystemSchema(schema_name),
1153 )));
1154 }
1155 };
1156 let owner_privileges = vec![rbac::owner_privilege(
1157 mz_sql::catalog::ObjectType::Schema,
1158 owner_id,
1159 )];
1160 let default_privileges = state
1161 .default_privileges
1162 .get_applicable_privileges(
1163 owner_id,
1164 Some(database_id),
1165 None,
1166 mz_sql::catalog::ObjectType::Schema,
1167 )
1168 .map(|item| item.mz_acl_item(owner_id));
1169 let privileges: Vec<_> =
1170 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1171 .collect();
1172 let (schema_id, _) = tx.insert_user_schema(
1173 database_id,
1174 &schema_name,
1175 owner_id,
1176 privileges.clone(),
1177 &state.get_temporary_oids().collect(),
1178 )?;
1179 CatalogState::add_to_audit_log(
1180 &state.system_configuration,
1181 oracle_write_ts,
1182 session,
1183 tx,
1184 audit_events,
1185 EventType::Create,
1186 ObjectType::Schema,
1187 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1188 id: schema_id.to_string(),
1189 name: schema_name.clone(),
1190 database_name: Some(state.database_by_id[&database_id].name.clone()),
1191 }),
1192 )?;
1193 }
1194 Op::CreateRole { name, attributes } => {
1195 if is_reserved_role_name(&name) {
1196 return Err(AdapterError::Catalog(Error::new(
1197 ErrorKind::ReservedRoleName(name),
1198 )));
1199 }
1200 let membership = RoleMembership::new();
1201 let vars = RoleVars::default();
1202 let (id, _) = tx.insert_user_role(
1203 name.clone(),
1204 attributes.clone(),
1205 membership.clone(),
1206 vars.clone(),
1207 &state.get_temporary_oids().collect(),
1208 )?;
1209 CatalogState::add_to_audit_log(
1210 &state.system_configuration,
1211 oracle_write_ts,
1212 session,
1213 tx,
1214 audit_events,
1215 EventType::Create,
1216 ObjectType::Role,
1217 EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1218 id: id.to_string(),
1219 name: name.clone(),
1220 auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1221 AutoProvisionSource::Oidc => "oidc".to_string(),
1222 AutoProvisionSource::Frontegg => "frontegg".to_string(),
1223 AutoProvisionSource::None => "none".to_string(),
1224 }),
1225 }),
1226 )?;
1227 info!("create role {}", name);
1228 }
1229 Op::CreateCluster {
1230 id,
1231 name,
1232 introspection_sources,
1233 owner_id,
1234 config,
1235 } => {
1236 if is_reserved_name(&name) {
1237 return Err(AdapterError::Catalog(Error::new(
1238 ErrorKind::ReservedClusterName(name),
1239 )));
1240 }
1241 let owner_privileges = vec![rbac::owner_privilege(
1242 mz_sql::catalog::ObjectType::Cluster,
1243 owner_id,
1244 )];
1245 let default_privileges = state
1246 .default_privileges
1247 .get_applicable_privileges(
1248 owner_id,
1249 None,
1250 None,
1251 mz_sql::catalog::ObjectType::Cluster,
1252 )
1253 .map(|item| item.mz_acl_item(owner_id));
1254 let privileges: Vec<_> =
1255 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1256 .collect();
1257 let introspection_source_ids: Vec<_> = introspection_sources
1258 .iter()
1259 .map(|introspection_source| {
1260 Transaction::allocate_introspection_source_index_id(
1261 &id,
1262 introspection_source.variant,
1263 )
1264 })
1265 .collect();
1266
1267 let introspection_sources = introspection_sources
1268 .into_iter()
1269 .zip_eq(introspection_source_ids)
1270 .map(|(log, (item_id, gid))| (log, item_id, gid))
1271 .collect();
1272
1273 tx.insert_user_cluster(
1274 id,
1275 &name,
1276 introspection_sources,
1277 owner_id,
1278 privileges.clone(),
1279 config.clone().into(),
1280 &state.get_temporary_oids().collect(),
1281 )?;
1282 CatalogState::add_to_audit_log(
1283 &state.system_configuration,
1284 oracle_write_ts,
1285 session,
1286 tx,
1287 audit_events,
1288 EventType::Create,
1289 ObjectType::Cluster,
1290 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1291 id: id.to_string(),
1292 name: name.clone(),
1293 }),
1294 )?;
1295 info!("create cluster {}", name);
1296 }
1297 Op::CreateClusterReplica {
1298 cluster_id,
1299 name,
1300 config,
1301 owner_id,
1302 reason,
1303 } => {
1304 if is_reserved_name(&name) {
1305 return Err(AdapterError::Catalog(Error::new(
1306 ErrorKind::ReservedReplicaName(name),
1307 )));
1308 }
1309 let cluster = state.get_cluster(cluster_id);
1310 let id =
1311 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1312 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1313 size,
1314 billed_as,
1315 internal,
1316 ..
1317 }) = &config.location
1318 {
1319 let (reason, scheduling_policies) = reason.into_audit_log();
1320 let details = EventDetails::CreateClusterReplicaV4(
1321 mz_audit_log::CreateClusterReplicaV4 {
1322 cluster_id: cluster_id.to_string(),
1323 cluster_name: cluster.name.clone(),
1324 replica_id: Some(id.to_string()),
1325 replica_name: name.clone(),
1326 logical_size: size.clone(),
1327 billed_as: billed_as.clone(),
1328 internal: *internal,
1329 reason,
1330 scheduling_policies,
1331 },
1332 );
1333 CatalogState::add_to_audit_log(
1334 &state.system_configuration,
1335 oracle_write_ts,
1336 session,
1337 tx,
1338 audit_events,
1339 EventType::Create,
1340 ObjectType::ClusterReplica,
1341 details,
1342 )?;
1343 }
1344 }
1345 Op::CreateItem {
1346 id,
1347 name,
1348 item,
1349 owner_id,
1350 } => {
1351 state.check_unstable_dependencies(&item)?;
1352
1353 match &item {
1354 CatalogItem::Table(table) => {
1355 let gids: Vec<_> = table.global_ids().collect();
1356 assert_eq!(gids.len(), 1);
1357 storage_collections_to_create.extend(gids);
1358 }
1359 CatalogItem::Source(source) => {
1360 storage_collections_to_create.insert(source.global_id());
1361 }
1362 CatalogItem::MaterializedView(mv) => {
1363 let mv_gid = mv.global_id_writes();
1364 if let Some(target_id) = mv.replacement_target {
1365 let target_gid = state.get_entry(&target_id).latest_global_id();
1366 let shard_id =
1367 state.storage_metadata().get_collection_shard(target_gid)?;
1368 storage_collections_to_register.insert(mv_gid, shard_id);
1369 } else {
1370 storage_collections_to_create.insert(mv_gid);
1371 }
1372 }
1373 CatalogItem::Sink(sink) => {
1374 storage_collections_to_create.insert(sink.global_id());
1375 }
1376 CatalogItem::Log(_)
1377 | CatalogItem::View(_)
1378 | CatalogItem::Index(_)
1379 | CatalogItem::Type(_)
1380 | CatalogItem::Func(_)
1381 | CatalogItem::Secret(_)
1382 | CatalogItem::Connection(_) => (),
1383 }
1384
1385 let system_user = session.map_or(false, |s| s.user().is_system_user());
1386 if !system_user {
1387 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1388 let cluster_name = state.clusters_by_id[&id].name.clone();
1389 return Err(AdapterError::Catalog(Error::new(
1390 ErrorKind::ReadOnlyCluster(cluster_name),
1391 )));
1392 }
1393 }
1394
1395 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1396 let default_privileges = state
1397 .default_privileges
1398 .get_applicable_privileges(
1399 owner_id,
1400 name.qualifiers.database_spec.id(),
1401 Some(name.qualifiers.schema_spec.into()),
1402 item.typ().into(),
1403 )
1404 .map(|item| item.mz_acl_item(owner_id));
1405 let progress_source_privilege = if item.is_progress_source() {
1407 Some(MzAclItem {
1408 grantee: MZ_SUPPORT_ROLE_ID,
1409 grantor: owner_id,
1410 acl_mode: AclMode::SELECT,
1411 })
1412 } else {
1413 None
1414 };
1415 let privileges: Vec<_> = merge_mz_acl_items(
1416 owner_privileges
1417 .into_iter()
1418 .chain(default_privileges)
1419 .chain(progress_source_privilege),
1420 )
1421 .collect();
1422
1423 let temporary_oids = state.get_temporary_oids().collect();
1424
1425 if item.is_temporary() {
1426 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1427 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1428 {
1429 return Err(AdapterError::Catalog(Error::new(
1430 ErrorKind::InvalidTemporarySchema,
1431 )));
1432 }
1433 let oid = tx.allocate_oid(&temporary_oids)?;
1434
1435 let schema_id = name.qualifiers.schema_spec.clone().into();
1436 let item_type = item.typ();
1437 let (create_sql, global_id, versions) = item.to_serialized();
1438
1439 let item = TemporaryItem {
1440 id,
1441 oid,
1442 global_id,
1443 schema_id,
1444 name: name.item.clone(),
1445 create_sql,
1446 conn_id: item.conn_id().cloned(),
1447 owner_id,
1448 privileges: privileges.clone(),
1449 extra_versions: versions,
1450 };
1451 temporary_item_updates.push((item, StateDiff::Addition));
1452
1453 info!(
1454 "create temporary {} {} ({})",
1455 item_type,
1456 state.resolve_full_name(&name, None),
1457 id
1458 );
1459 } else {
1460 if let Some(temp_id) =
1461 item.uses()
1462 .iter()
1463 .find(|id| match state.try_get_entry(*id) {
1464 Some(entry) => entry.item().is_temporary(),
1465 None => temporary_ids.contains(id),
1466 })
1467 {
1468 let temp_item = state.get_entry(temp_id);
1469 return Err(AdapterError::Catalog(Error::new(
1470 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1471 )));
1472 }
1473 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1474 && !system_user
1475 {
1476 let schema_name = state
1477 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1478 .schema;
1479 return Err(AdapterError::Catalog(Error::new(
1480 ErrorKind::ReadOnlySystemSchema(schema_name),
1481 )));
1482 }
1483 let schema_id = name.qualifiers.schema_spec.clone().into();
1484 let item_type = item.typ();
1485 let (create_sql, global_id, versions) = item.to_serialized();
1486 tx.insert_user_item(
1487 id,
1488 global_id,
1489 schema_id,
1490 &name.item,
1491 create_sql,
1492 owner_id,
1493 privileges.clone(),
1494 &temporary_oids,
1495 versions,
1496 )?;
1497 info!(
1498 "create {} {} ({})",
1499 item_type,
1500 state.resolve_full_name(&name, None),
1501 id
1502 );
1503 }
1504
1505 if Self::should_audit_log_item(&item) {
1506 let name = Self::full_name_detail(
1507 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1508 );
1509 let details = match &item {
1510 CatalogItem::Source(s) => {
1511 let cluster_id = match s.data_source {
1512 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1515 match state.get_entry(&ingestion_id).cluster_id() {
1516 Some(cluster_id) => Some(cluster_id.to_string()),
1517 None => None,
1518 }
1519 }
1520 _ => match item.cluster_id() {
1521 Some(cluster_id) => Some(cluster_id.to_string()),
1522 None => None,
1523 },
1524 };
1525
1526 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1527 id: id.to_string(),
1528 cluster_id,
1529 name,
1530 external_type: s.source_type().to_string(),
1531 })
1532 }
1533 CatalogItem::Sink(s) => {
1534 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1535 id: id.to_string(),
1536 cluster_id: Some(s.cluster_id.to_string()),
1537 name,
1538 external_type: s.sink_type().to_string(),
1539 })
1540 }
1541 CatalogItem::Index(i) => {
1542 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1543 id: id.to_string(),
1544 name,
1545 cluster_id: i.cluster_id.to_string(),
1546 })
1547 }
1548 CatalogItem::MaterializedView(mv) => {
1549 EventDetails::CreateMaterializedViewV1(
1550 mz_audit_log::CreateMaterializedViewV1 {
1551 id: id.to_string(),
1552 name,
1553 cluster_id: mv.cluster_id.to_string(),
1554 replacement_target_id: mv
1555 .replacement_target
1556 .map(|id| id.to_string()),
1557 },
1558 )
1559 }
1560 CatalogItem::Table(_)
1561 | CatalogItem::Log(_)
1562 | CatalogItem::View(_)
1563 | CatalogItem::Type(_)
1564 | CatalogItem::Func(_)
1565 | CatalogItem::Secret(_)
1566 | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1567 id: id.to_string(),
1568 name,
1569 }),
1570 };
1571 CatalogState::add_to_audit_log(
1572 &state.system_configuration,
1573 oracle_write_ts,
1574 session,
1575 tx,
1576 audit_events,
1577 EventType::Create,
1578 catalog_type_to_audit_object_type(item.typ()),
1579 details,
1580 )?;
1581 }
1582 }
1583 Op::CreateNetworkPolicy {
1584 rules,
1585 name,
1586 owner_id,
1587 } => {
1588 if state.network_policies_by_name.contains_key(&name) {
1589 return Err(AdapterError::PlanError(PlanError::Catalog(
1590 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1591 )));
1592 }
1593 if is_reserved_name(&name) {
1594 return Err(AdapterError::Catalog(Error::new(
1595 ErrorKind::ReservedNetworkPolicyName(name),
1596 )));
1597 }
1598
1599 let owner_privileges = vec![rbac::owner_privilege(
1600 mz_sql::catalog::ObjectType::NetworkPolicy,
1601 owner_id,
1602 )];
1603 let default_privileges = state
1604 .default_privileges
1605 .get_applicable_privileges(
1606 owner_id,
1607 None,
1608 None,
1609 mz_sql::catalog::ObjectType::NetworkPolicy,
1610 )
1611 .map(|item| item.mz_acl_item(owner_id));
1612 let privileges: Vec<_> =
1613 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1614 .collect();
1615
1616 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1617 let id = tx.insert_user_network_policy(
1618 name.clone(),
1619 rules,
1620 privileges,
1621 owner_id,
1622 &temporary_oids,
1623 )?;
1624
1625 CatalogState::add_to_audit_log(
1626 &state.system_configuration,
1627 oracle_write_ts,
1628 session,
1629 tx,
1630 audit_events,
1631 EventType::Create,
1632 ObjectType::NetworkPolicy,
1633 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1634 id: id.to_string(),
1635 name: name.clone(),
1636 }),
1637 )?;
1638
1639 info!("created network policy {name} ({id})");
1640 }
1641 Op::Comment {
1642 object_id,
1643 sub_component,
1644 comment,
1645 } => {
1646 tx.update_comment(object_id, sub_component, comment)?;
1647 let entry = state.get_comment_id_entry(&object_id);
1648 let should_log = entry
1649 .map(|entry| Self::should_audit_log_item(entry.item()))
1650 .unwrap_or(true);
1652 if let (Some(conn_id), true) =
1655 (session.map(|session| session.conn_id()), should_log)
1656 {
1657 CatalogState::add_to_audit_log(
1658 &state.system_configuration,
1659 oracle_write_ts,
1660 session,
1661 tx,
1662 audit_events,
1663 EventType::Comment,
1664 comment_id_to_audit_object_type(object_id),
1665 EventDetails::IdNameV1(IdNameV1 {
1666 id: format!("{object_id:?}"),
1668 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1669 }),
1670 )?;
1671 }
1672 }
1673 Op::UpdateSourceReferences {
1674 source_id,
1675 references,
1676 } => {
1677 tx.update_source_references(
1678 source_id,
1679 references
1680 .references
1681 .into_iter()
1682 .map(|reference| reference.into())
1683 .collect(),
1684 references.updated_at,
1685 )?;
1686 }
1687 Op::DropObjects(drop_object_infos) => {
1688 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1690
1691 tx.drop_comments(&delta.comments)?;
1693
1694 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1696 delta
1697 .items
1698 .iter()
1699 .map(|id| id)
1700 .partition(|id| !state.get_entry(*id).item().is_temporary());
1701 tx.remove_items(&durable_items_to_drop)?;
1702 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1703 let entry = state.get_entry(&id);
1704 (entry.clone().into(), StateDiff::Retraction)
1705 }));
1706
1707 for item_id in delta.items {
1708 let entry = state.get_entry(&item_id);
1709
1710 if entry.item().is_storage_collection() {
1711 storage_collections_to_drop.extend(entry.global_ids());
1712 }
1713
1714 if state.source_references.contains_key(&item_id) {
1715 tx.remove_source_references(item_id)?;
1716 }
1717
1718 if Self::should_audit_log_item(entry.item()) {
1719 CatalogState::add_to_audit_log(
1720 &state.system_configuration,
1721 oracle_write_ts,
1722 session,
1723 tx,
1724 audit_events,
1725 EventType::Drop,
1726 catalog_type_to_audit_object_type(entry.item().typ()),
1727 EventDetails::IdFullNameV1(IdFullNameV1 {
1728 id: item_id.to_string(),
1729 name: Self::full_name_detail(&state.resolve_full_name(
1730 entry.name(),
1731 session.map(|session| session.conn_id()),
1732 )),
1733 }),
1734 )?;
1735 }
1736 info!(
1737 "drop {} {} ({})",
1738 entry.item_type(),
1739 state.resolve_full_name(entry.name(), entry.conn_id()),
1740 item_id
1741 );
1742 }
1743
1744 let schemas = delta
1746 .schemas
1747 .iter()
1748 .map(|(schema_spec, database_spec)| {
1749 (SchemaId::from(schema_spec), *database_spec)
1750 })
1751 .collect();
1752 tx.remove_schemas(&schemas)?;
1753
1754 for (schema_spec, database_spec) in delta.schemas {
1755 let schema = state.get_schema(
1756 &database_spec,
1757 &schema_spec,
1758 session
1759 .map(|session| session.conn_id())
1760 .unwrap_or(&SYSTEM_CONN_ID),
1761 );
1762
1763 let schema_id = SchemaId::from(schema_spec);
1764 let database_id = match database_spec {
1765 ResolvedDatabaseSpecifier::Ambient => None,
1766 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1767 };
1768
1769 CatalogState::add_to_audit_log(
1770 &state.system_configuration,
1771 oracle_write_ts,
1772 session,
1773 tx,
1774 audit_events,
1775 EventType::Drop,
1776 ObjectType::Schema,
1777 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1778 id: schema_id.to_string(),
1779 name: schema.name.schema.to_string(),
1780 database_name: database_id
1781 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1782 }),
1783 )?;
1784 }
1785
1786 tx.remove_databases(&delta.databases)?;
1788
1789 for database_id in delta.databases {
1790 let database = state.get_database(&database_id).clone();
1791
1792 CatalogState::add_to_audit_log(
1793 &state.system_configuration,
1794 oracle_write_ts,
1795 session,
1796 tx,
1797 audit_events,
1798 EventType::Drop,
1799 ObjectType::Database,
1800 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1801 id: database_id.to_string(),
1802 name: database.name.clone(),
1803 }),
1804 )?;
1805 }
1806
1807 tx.remove_user_roles(&delta.roles)?;
1809
1810 for role_id in delta.roles {
1811 let role = state
1812 .roles_by_id
1813 .get(&role_id)
1814 .expect("catalog out of sync");
1815
1816 CatalogState::add_to_audit_log(
1817 &state.system_configuration,
1818 oracle_write_ts,
1819 session,
1820 tx,
1821 audit_events,
1822 EventType::Drop,
1823 ObjectType::Role,
1824 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1825 id: role.id.to_string(),
1826 name: role.name.clone(),
1827 }),
1828 )?;
1829 info!("drop role {}", role.name());
1830 }
1831
1832 tx.remove_network_policies(&delta.network_policies)?;
1834
1835 for network_policy_id in delta.network_policies {
1836 let policy = state
1837 .network_policies_by_id
1838 .get(&network_policy_id)
1839 .expect("catalog out of sync");
1840
1841 CatalogState::add_to_audit_log(
1842 &state.system_configuration,
1843 oracle_write_ts,
1844 session,
1845 tx,
1846 audit_events,
1847 EventType::Drop,
1848 ObjectType::NetworkPolicy,
1849 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1850 id: policy.id.to_string(),
1851 name: policy.name.clone(),
1852 }),
1853 )?;
1854 info!("drop network policy {}", policy.name.clone());
1855 }
1856
1857 let replicas = delta.replicas.keys().copied().collect();
1859 tx.remove_cluster_replicas(&replicas)?;
1860
1861 for (replica_id, (cluster_id, reason)) in delta.replicas {
1862 let cluster = state.get_cluster(cluster_id);
1863 let replica = cluster.replica(replica_id).expect("Must exist");
1864
1865 let (reason, scheduling_policies) = reason.into_audit_log();
1866 let details =
1867 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1868 cluster_id: cluster_id.to_string(),
1869 cluster_name: cluster.name.clone(),
1870 replica_id: Some(replica_id.to_string()),
1871 replica_name: replica.name.clone(),
1872 reason,
1873 scheduling_policies,
1874 });
1875 CatalogState::add_to_audit_log(
1876 &state.system_configuration,
1877 oracle_write_ts,
1878 session,
1879 tx,
1880 audit_events,
1881 EventType::Drop,
1882 ObjectType::ClusterReplica,
1883 details,
1884 )?;
1885 }
1886
1887 tx.remove_clusters(&delta.clusters)?;
1889
1890 for cluster_id in delta.clusters {
1891 let cluster = state.get_cluster(cluster_id);
1892
1893 CatalogState::add_to_audit_log(
1894 &state.system_configuration,
1895 oracle_write_ts,
1896 session,
1897 tx,
1898 audit_events,
1899 EventType::Drop,
1900 ObjectType::Cluster,
1901 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1902 id: cluster.id.to_string(),
1903 name: cluster.name.clone(),
1904 }),
1905 )?;
1906 }
1907 }
1908 Op::GrantRole {
1909 role_id,
1910 member_id,
1911 grantor_id,
1912 } => {
1913 state.ensure_not_reserved_role(&member_id)?;
1914 state.ensure_grantable_role(&role_id)?;
1915 if state.collect_role_membership(&role_id).contains(&member_id) {
1916 let group_role = state.get_role(&role_id);
1917 let member_role = state.get_role(&member_id);
1918 return Err(AdapterError::Catalog(Error::new(
1919 ErrorKind::CircularRoleMembership {
1920 role_name: group_role.name().to_string(),
1921 member_name: member_role.name().to_string(),
1922 },
1923 )));
1924 }
1925 let mut member_role = state.get_role(&member_id).clone();
1926 member_role.membership.map.insert(role_id, grantor_id);
1927 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1928
1929 CatalogState::add_to_audit_log(
1930 &state.system_configuration,
1931 oracle_write_ts,
1932 session,
1933 tx,
1934 audit_events,
1935 EventType::Grant,
1936 ObjectType::Role,
1937 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1938 role_id: role_id.to_string(),
1939 member_id: member_id.to_string(),
1940 grantor_id: grantor_id.to_string(),
1941 executed_by: session
1942 .map(|session| session.authenticated_role_id())
1943 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1944 .to_string(),
1945 }),
1946 )?;
1947 }
1948 Op::RevokeRole {
1949 role_id,
1950 member_id,
1951 grantor_id,
1952 } => {
1953 state.ensure_not_reserved_role(&member_id)?;
1954 state.ensure_grantable_role(&role_id)?;
1955 let mut member_role = state.get_role(&member_id).clone();
1956 member_role.membership.map.remove(&role_id);
1957 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1958
1959 CatalogState::add_to_audit_log(
1960 &state.system_configuration,
1961 oracle_write_ts,
1962 session,
1963 tx,
1964 audit_events,
1965 EventType::Revoke,
1966 ObjectType::Role,
1967 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1968 role_id: role_id.to_string(),
1969 member_id: member_id.to_string(),
1970 grantor_id: grantor_id.to_string(),
1971 executed_by: session
1972 .map(|session| session.authenticated_role_id())
1973 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1974 .to_string(),
1975 }),
1976 )?;
1977 }
1978 Op::UpdatePrivilege {
1979 target_id,
1980 privilege,
1981 variant,
1982 } => {
1983 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1984 UpdatePrivilegeVariant::Grant => {
1985 privileges.grant(privilege);
1986 }
1987 UpdatePrivilegeVariant::Revoke => {
1988 privileges.revoke(&privilege);
1989 }
1990 };
1991 match &target_id {
1992 SystemObjectId::Object(object_id) => match object_id {
1993 ObjectId::Cluster(id) => {
1994 let mut cluster = state.get_cluster(*id).clone();
1995 update_privilege_fn(&mut cluster.privileges);
1996 tx.update_cluster(*id, cluster.into())?;
1997 }
1998 ObjectId::Database(id) => {
1999 let mut database = state.get_database(id).clone();
2000 update_privilege_fn(&mut database.privileges);
2001 tx.update_database(*id, database.into())?;
2002 }
2003 ObjectId::NetworkPolicy(id) => {
2004 let mut policy = state.get_network_policy(id).clone();
2005 update_privilege_fn(&mut policy.privileges);
2006 tx.update_network_policy(*id, policy.into())?;
2007 }
2008 ObjectId::Schema((database_spec, schema_spec)) => {
2009 let schema_id = schema_spec.clone().into();
2010 let mut schema = state
2011 .get_schema(
2012 database_spec,
2013 schema_spec,
2014 session
2015 .map(|session| session.conn_id())
2016 .unwrap_or(&SYSTEM_CONN_ID),
2017 )
2018 .clone();
2019 update_privilege_fn(&mut schema.privileges);
2020 tx.update_schema(schema_id, schema.into())?;
2021 }
2022 ObjectId::Item(id) => {
2023 let entry = state.get_entry(id);
2024 let mut new_entry = entry.clone();
2025 update_privilege_fn(&mut new_entry.privileges);
2026 if !new_entry.item().is_temporary() {
2027 tx.update_item(*id, new_entry.into())?;
2028 } else {
2029 temporary_item_updates
2030 .push((entry.clone().into(), StateDiff::Retraction));
2031 temporary_item_updates
2032 .push((new_entry.into(), StateDiff::Addition));
2033 }
2034 }
2035 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2036 },
2037 SystemObjectId::System => {
2038 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2039 update_privilege_fn(&mut system_privileges);
2040 let new_privilege =
2041 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2042 tx.set_system_privilege(
2043 privilege.grantee,
2044 privilege.grantor,
2045 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2046 )?;
2047 }
2048 }
2049 let object_type = state.get_system_object_type(&target_id);
2050 let object_id_str = match &target_id {
2051 SystemObjectId::System => "SYSTEM".to_string(),
2052 SystemObjectId::Object(id) => id.to_string(),
2053 };
2054 CatalogState::add_to_audit_log(
2055 &state.system_configuration,
2056 oracle_write_ts,
2057 session,
2058 tx,
2059 audit_events,
2060 variant.into(),
2061 system_object_type_to_audit_object_type(&object_type),
2062 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2063 object_id: object_id_str,
2064 grantee_id: privilege.grantee.to_string(),
2065 grantor_id: privilege.grantor.to_string(),
2066 privileges: privilege.acl_mode.to_string(),
2067 }),
2068 )?;
2069 }
2070 Op::UpdateDefaultPrivilege {
2071 privilege_object,
2072 privilege_acl_item,
2073 variant,
2074 } => {
2075 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2076 match variant {
2077 UpdatePrivilegeVariant::Grant => default_privileges
2078 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2079 UpdatePrivilegeVariant::Revoke => {
2080 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2081 }
2082 }
2083 let new_acl_mode = default_privileges
2084 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2085 tx.set_default_privilege(
2086 privilege_object.role_id,
2087 privilege_object.database_id,
2088 privilege_object.schema_id,
2089 privilege_object.object_type,
2090 privilege_acl_item.grantee,
2091 new_acl_mode.cloned(),
2092 )?;
2093 CatalogState::add_to_audit_log(
2094 &state.system_configuration,
2095 oracle_write_ts,
2096 session,
2097 tx,
2098 audit_events,
2099 variant.into(),
2100 object_type_to_audit_object_type(privilege_object.object_type),
2101 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2102 role_id: privilege_object.role_id.to_string(),
2103 database_id: privilege_object.database_id.map(|id| id.to_string()),
2104 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2105 grantee_id: privilege_acl_item.grantee.to_string(),
2106 privileges: privilege_acl_item.acl_mode.to_string(),
2107 }),
2108 )?;
2109 }
2110 Op::RenameCluster {
2111 id,
2112 name,
2113 to_name,
2114 check_reserved_names,
2115 } => {
2116 if id.is_system() {
2117 return Err(AdapterError::Catalog(Error::new(
2118 ErrorKind::ReadOnlyCluster(name.clone()),
2119 )));
2120 }
2121 if check_reserved_names && is_reserved_name(&to_name) {
2122 return Err(AdapterError::Catalog(Error::new(
2123 ErrorKind::ReservedClusterName(to_name),
2124 )));
2125 }
2126 tx.rename_cluster(id, &name, &to_name)?;
2127 CatalogState::add_to_audit_log(
2128 &state.system_configuration,
2129 oracle_write_ts,
2130 session,
2131 tx,
2132 audit_events,
2133 EventType::Alter,
2134 ObjectType::Cluster,
2135 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2136 id: id.to_string(),
2137 old_name: name.clone(),
2138 new_name: to_name.clone(),
2139 }),
2140 )?;
2141 info!("rename cluster {name} to {to_name}");
2142 }
2143 Op::RenameClusterReplica {
2144 cluster_id,
2145 replica_id,
2146 name,
2147 to_name,
2148 } => {
2149 if is_reserved_name(&to_name) {
2150 return Err(AdapterError::Catalog(Error::new(
2151 ErrorKind::ReservedReplicaName(to_name),
2152 )));
2153 }
2154 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2155 CatalogState::add_to_audit_log(
2156 &state.system_configuration,
2157 oracle_write_ts,
2158 session,
2159 tx,
2160 audit_events,
2161 EventType::Alter,
2162 ObjectType::ClusterReplica,
2163 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2164 cluster_id: cluster_id.to_string(),
2165 replica_id: replica_id.to_string(),
2166 old_name: name.replica.as_str().to_string(),
2167 new_name: to_name.clone(),
2168 }),
2169 )?;
2170 info!("rename cluster replica {name} to {to_name}");
2171 }
2172 Op::RenameItem {
2173 id,
2174 to_name,
2175 current_full_name,
2176 } => {
2177 let mut updates = Vec::new();
2178
2179 let entry = state.get_entry(&id);
2180 if let CatalogItem::Type(_) = entry.item() {
2181 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2182 current_full_name.to_string(),
2183 ))));
2184 }
2185
2186 if entry.id().is_system() {
2187 let name = state
2188 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2189 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2190 name.to_string(),
2191 ))));
2192 }
2193
2194 let mut to_full_name = current_full_name.clone();
2195 to_full_name.item.clone_from(&to_name);
2196
2197 let mut to_qualified_name = entry.name().clone();
2198 to_qualified_name.item.clone_from(&to_name);
2199
2200 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2201 id: id.to_string(),
2202 old_name: Self::full_name_detail(¤t_full_name),
2203 new_name: Self::full_name_detail(&to_full_name),
2204 });
2205 if Self::should_audit_log_item(entry.item()) {
2206 CatalogState::add_to_audit_log(
2207 &state.system_configuration,
2208 oracle_write_ts,
2209 session,
2210 tx,
2211 audit_events,
2212 EventType::Alter,
2213 catalog_type_to_audit_object_type(entry.item().typ()),
2214 details,
2215 )?;
2216 }
2217
2218 let mut new_entry = entry.clone();
2220 new_entry.name.item.clone_from(&to_name);
2221 new_entry.item = entry
2222 .item()
2223 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2224 .map_err(|e| {
2225 Error::new(ErrorKind::from(AmbiguousRename {
2226 depender: state
2227 .resolve_full_name(entry.name(), entry.conn_id())
2228 .to_string(),
2229 dependee: state
2230 .resolve_full_name(entry.name(), entry.conn_id())
2231 .to_string(),
2232 message: e,
2233 }))
2234 })?;
2235
2236 for id in entry.referenced_by() {
2237 let dependent_item = state.get_entry(id);
2238 let mut to_entry = dependent_item.clone();
2239 to_entry.item = dependent_item
2240 .item()
2241 .rename_item_refs(
2242 current_full_name.clone(),
2243 to_full_name.item.clone(),
2244 false,
2245 )
2246 .map_err(|e| {
2247 Error::new(ErrorKind::from(AmbiguousRename {
2248 depender: state
2249 .resolve_full_name(
2250 dependent_item.name(),
2251 dependent_item.conn_id(),
2252 )
2253 .to_string(),
2254 dependee: state
2255 .resolve_full_name(entry.name(), entry.conn_id())
2256 .to_string(),
2257 message: e,
2258 }))
2259 })?;
2260
2261 if !to_entry.item().is_temporary() {
2262 tx.update_item(*id, to_entry.into())?;
2263 } else {
2264 temporary_item_updates
2265 .push((dependent_item.clone().into(), StateDiff::Retraction));
2266 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2267 }
2268 updates.push(*id);
2269 }
2270 if !new_entry.item().is_temporary() {
2271 tx.update_item(id, new_entry.into())?;
2272 } else {
2273 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2274 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2275 }
2276
2277 updates.push(id);
2278 for id in updates {
2279 Self::log_update(state, &id);
2280 }
2281 }
2282 Op::RenameSchema {
2283 database_spec,
2284 schema_spec,
2285 new_name,
2286 check_reserved_names,
2287 } => {
2288 if check_reserved_names && is_reserved_name(&new_name) {
2289 return Err(AdapterError::Catalog(Error::new(
2290 ErrorKind::ReservedSchemaName(new_name),
2291 )));
2292 }
2293
2294 let conn_id = session
2295 .map(|session| session.conn_id())
2296 .unwrap_or(&SYSTEM_CONN_ID);
2297
2298 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2299 let cur_name = schema.name().schema.clone();
2300
2301 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2302 return Err(AdapterError::Catalog(Error::new(
2303 ErrorKind::AmbientSchemaRename(cur_name),
2304 )));
2305 };
2306 let database = state.get_database(&database_id);
2307 let database_name = &database.name;
2308
2309 let mut updates = Vec::new();
2310 let mut items_to_update = BTreeMap::new();
2311
2312 let mut update_item = |id| {
2313 if items_to_update.contains_key(id) {
2314 return Ok(());
2315 }
2316
2317 let entry = state.get_entry(id);
2318
2319 let mut new_entry = entry.clone();
2321 new_entry.item = entry
2322 .item
2323 .rename_schema_refs(database_name, &cur_name, &new_name)
2324 .map_err(|(s, _i)| {
2325 Error::new(ErrorKind::from(AmbiguousRename {
2326 depender: state
2327 .resolve_full_name(entry.name(), entry.conn_id())
2328 .to_string(),
2329 dependee: format!("{database_name}.{cur_name}"),
2330 message: format!("ambiguous reference to schema named {s}"),
2331 }))
2332 })?;
2333
2334 if !new_entry.item().is_temporary() {
2336 items_to_update.insert(*id, new_entry.into());
2337 } else {
2338 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2339 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2340 }
2341 updates.push(id);
2342
2343 Ok::<_, AdapterError>(())
2344 };
2345
2346 for (_name, item_id) in &schema.items {
2348 update_item(item_id)?;
2350
2351 for id in state.get_entry(item_id).referenced_by() {
2353 update_item(id)?;
2354 }
2355 }
2356 tx.update_items(items_to_update)?;
2359
2360 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2362 let schema_name = schema.name().schema.clone();
2363 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2364 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2365 )));
2366 };
2367
2368 let database_name = database_spec
2370 .id()
2371 .map(|id| state.get_database(&id).name.clone());
2372 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2373 id: schema_id.to_string(),
2374 old_name: schema.name().schema.clone(),
2375 new_name: new_name.clone(),
2376 database_name,
2377 });
2378 CatalogState::add_to_audit_log(
2379 &state.system_configuration,
2380 oracle_write_ts,
2381 session,
2382 tx,
2383 audit_events,
2384 EventType::Alter,
2385 mz_audit_log::ObjectType::Schema,
2386 details,
2387 )?;
2388
2389 let mut new_schema = schema.clone();
2391 new_schema.name.schema.clone_from(&new_name);
2392 tx.update_schema(schema_id, new_schema.into())?;
2393
2394 for id in updates {
2395 Self::log_update(state, id);
2396 }
2397 }
2398 Op::UpdateOwner { id, new_owner } => {
2399 let conn_id = session
2400 .map(|session| session.conn_id())
2401 .unwrap_or(&SYSTEM_CONN_ID);
2402 let old_owner = state
2403 .get_owner_id(&id, conn_id)
2404 .expect("cannot update the owner of an object without an owner");
2405 match &id {
2406 ObjectId::Cluster(id) => {
2407 let mut cluster = state.get_cluster(*id).clone();
2408 if id.is_system() {
2409 return Err(AdapterError::Catalog(Error::new(
2410 ErrorKind::ReadOnlyCluster(cluster.name),
2411 )));
2412 }
2413 Self::update_privilege_owners(
2414 &mut cluster.privileges,
2415 cluster.owner_id,
2416 new_owner,
2417 );
2418 cluster.owner_id = new_owner;
2419 tx.update_cluster(*id, cluster.into())?;
2420 }
2421 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2422 let cluster = state.get_cluster(*cluster_id);
2423 let mut replica = cluster
2424 .replica(*replica_id)
2425 .expect("catalog out of sync")
2426 .clone();
2427 if replica_id.is_system() {
2428 return Err(AdapterError::Catalog(Error::new(
2429 ErrorKind::ReadOnlyClusterReplica(replica.name),
2430 )));
2431 }
2432 replica.owner_id = new_owner;
2433 tx.update_cluster_replica(*replica_id, replica.into())?;
2434 }
2435 ObjectId::Database(id) => {
2436 let mut database = state.get_database(id).clone();
2437 if id.is_system() {
2438 return Err(AdapterError::Catalog(Error::new(
2439 ErrorKind::ReadOnlyDatabase(database.name),
2440 )));
2441 }
2442 Self::update_privilege_owners(
2443 &mut database.privileges,
2444 database.owner_id,
2445 new_owner,
2446 );
2447 database.owner_id = new_owner;
2448 tx.update_database(*id, database.clone().into())?;
2449 }
2450 ObjectId::Schema((database_spec, schema_spec)) => {
2451 let schema_id: SchemaId = schema_spec.clone().into();
2452 let mut schema = state
2453 .get_schema(database_spec, schema_spec, conn_id)
2454 .clone();
2455 if schema_id.is_system() {
2456 let name = schema.name();
2457 let full_name = state.resolve_full_schema_name(name);
2458 return Err(AdapterError::Catalog(Error::new(
2459 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2460 )));
2461 }
2462 Self::update_privilege_owners(
2463 &mut schema.privileges,
2464 schema.owner_id,
2465 new_owner,
2466 );
2467 schema.owner_id = new_owner;
2468 tx.update_schema(schema_id, schema.into())?;
2469 }
2470 ObjectId::Item(id) => {
2471 let entry = state.get_entry(id);
2472 let mut new_entry = entry.clone();
2473 if id.is_system() {
2474 let full_name = state.resolve_full_name(
2475 new_entry.name(),
2476 session.map(|session| session.conn_id()),
2477 );
2478 return Err(AdapterError::Catalog(Error::new(
2479 ErrorKind::ReadOnlyItem(full_name.to_string()),
2480 )));
2481 }
2482 Self::update_privilege_owners(
2483 &mut new_entry.privileges,
2484 new_entry.owner_id,
2485 new_owner,
2486 );
2487 new_entry.owner_id = new_owner;
2488 if !new_entry.item().is_temporary() {
2489 tx.update_item(*id, new_entry.into())?;
2490 } else {
2491 temporary_item_updates
2492 .push((entry.clone().into(), StateDiff::Retraction));
2493 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2494 }
2495 }
2496 ObjectId::NetworkPolicy(id) => {
2497 let mut policy = state.get_network_policy(id).clone();
2498 if id.is_system() {
2499 return Err(AdapterError::Catalog(Error::new(
2500 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2501 )));
2502 }
2503 Self::update_privilege_owners(
2504 &mut policy.privileges,
2505 policy.owner_id,
2506 new_owner,
2507 );
2508 policy.owner_id = new_owner;
2509 tx.update_network_policy(*id, policy.into())?;
2510 }
2511 ObjectId::Role(_) => unreachable!("roles have no owner"),
2512 }
2513 let object_type = state.get_object_type(&id);
2514 CatalogState::add_to_audit_log(
2515 &state.system_configuration,
2516 oracle_write_ts,
2517 session,
2518 tx,
2519 audit_events,
2520 EventType::Alter,
2521 object_type_to_audit_object_type(object_type),
2522 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2523 object_id: id.to_string(),
2524 old_owner_id: old_owner.to_string(),
2525 new_owner_id: new_owner.to_string(),
2526 }),
2527 )?;
2528 }
2529 Op::UpdateClusterConfig { id, name, config } => {
2530 let mut cluster = state.get_cluster(id).clone();
2531 cluster.config = config;
2532 tx.update_cluster(id, cluster.into())?;
2533 info!("update cluster {}", name);
2534
2535 CatalogState::add_to_audit_log(
2536 &state.system_configuration,
2537 oracle_write_ts,
2538 session,
2539 tx,
2540 audit_events,
2541 EventType::Alter,
2542 ObjectType::Cluster,
2543 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2544 id: id.to_string(),
2545 name,
2546 }),
2547 )?;
2548 }
2549 Op::UpdateClusterReplicaConfig {
2550 replica_id,
2551 cluster_id,
2552 config,
2553 } => {
2554 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2555 info!("update replica {}", replica.name);
2556 tx.update_cluster_replica(
2557 replica_id,
2558 mz_catalog::durable::ClusterReplica {
2559 cluster_id,
2560 replica_id,
2561 name: replica.name.clone(),
2562 config: config.clone().into(),
2563 owner_id: replica.owner_id,
2564 },
2565 )?;
2566 }
2567 Op::UpdateItem { id, name, to_item } => {
2568 let mut entry = state.get_entry(&id).clone();
2569 entry.name = name.clone();
2570 entry.item = to_item.clone();
2571 tx.update_item(id, entry.into())?;
2572
2573 if Self::should_audit_log_item(&to_item) {
2574 let mut full_name = Self::full_name_detail(
2575 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2576 );
2577 full_name.item = name.item;
2578
2579 CatalogState::add_to_audit_log(
2580 &state.system_configuration,
2581 oracle_write_ts,
2582 session,
2583 tx,
2584 audit_events,
2585 EventType::Alter,
2586 catalog_type_to_audit_object_type(to_item.typ()),
2587 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2588 id: id.to_string(),
2589 name: full_name,
2590 }),
2591 )?;
2592 }
2593
2594 Self::log_update(state, &id);
2595 }
2596 Op::UpdateSystemConfiguration { name, value } => {
2597 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2598 tx.upsert_system_config(&name, parsed_value.clone())?;
2599 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2604 let with_0dt_deployment_max_wait =
2605 Duration::parse(VarInput::Flat(&parsed_value))
2606 .expect("parsing succeeded above");
2607 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2608 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2609 let with_0dt_deployment_ddl_check_interval =
2610 Duration::parse(VarInput::Flat(&parsed_value))
2611 .expect("parsing succeeded above");
2612 tx.set_0dt_deployment_ddl_check_interval(
2613 with_0dt_deployment_ddl_check_interval,
2614 )?;
2615 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2616 let panic_after_timeout =
2617 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2618 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2619 }
2620
2621 CatalogState::add_to_audit_log(
2622 &state.system_configuration,
2623 oracle_write_ts,
2624 session,
2625 tx,
2626 audit_events,
2627 EventType::Alter,
2628 ObjectType::System,
2629 EventDetails::SetV1(mz_audit_log::SetV1 {
2630 name,
2631 value: Some(value.borrow().to_vec().join(", ")),
2632 }),
2633 )?;
2634 }
2635 Op::ResetSystemConfiguration { name } => {
2636 tx.remove_system_config(&name);
2637 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2642 tx.reset_0dt_deployment_max_wait()?;
2643 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2644 tx.reset_0dt_deployment_ddl_check_interval()?;
2645 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2646 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2647 }
2648
2649 CatalogState::add_to_audit_log(
2650 &state.system_configuration,
2651 oracle_write_ts,
2652 session,
2653 tx,
2654 audit_events,
2655 EventType::Alter,
2656 ObjectType::System,
2657 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2658 )?;
2659 }
2660 Op::ResetAllSystemConfiguration => {
2661 tx.clear_system_configs();
2662 tx.reset_0dt_deployment_max_wait()?;
2663 tx.reset_0dt_deployment_ddl_check_interval()?;
2664 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2665
2666 CatalogState::add_to_audit_log(
2667 &state.system_configuration,
2668 oracle_write_ts,
2669 session,
2670 tx,
2671 audit_events,
2672 EventType::Alter,
2673 ObjectType::System,
2674 EventDetails::ResetAllV1,
2675 )?;
2676 }
2677 Op::WeirdStorageUsageUpdates {
2678 object_id,
2679 size_bytes,
2680 collection_timestamp,
2681 } => {
2682 let id = tx.allocate_storage_usage_ids()?;
2683 let metric =
2684 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2685 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2686 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2687 weird_builtin_table_update = Some(builtin_table_update);
2688 }
2689 Op::InjectAuditEvents { events } => {
2690 for event in events {
2691 let id = tx.allocate_audit_log_id()?;
2692 let ev = VersionedEvent::new(
2693 id,
2694 event.event_type,
2695 event.object_type,
2696 event.details,
2697 event.user,
2698 oracle_write_ts.into(),
2699 );
2700 audit_events.push(ev.clone());
2701 tx.insert_audit_log_event(ev);
2702 }
2703 }
2704 };
2705 Ok((weird_builtin_table_update, temporary_item_updates))
2706 }
2707
2708 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2709 let entry = state.get_entry(id);
2710 info!(
2711 "update {} {} ({})",
2712 entry.item_type(),
2713 state.resolve_full_name(entry.name(), entry.conn_id()),
2714 id
2715 );
2716 }
2717
2718 fn update_privilege_owners(
2722 privileges: &mut PrivilegeMap,
2723 old_owner: RoleId,
2724 new_owner: RoleId,
2725 ) {
2726 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2728
2729 let mut new_present = false;
2730 for privilege in flat_privileges.iter_mut() {
2731 if privilege.grantor == old_owner {
2734 privilege.grantor = new_owner;
2735 } else if privilege.grantor == new_owner {
2736 new_present = true;
2737 }
2738 if privilege.grantee == old_owner {
2740 privilege.grantee = new_owner;
2741 } else if privilege.grantee == new_owner {
2742 new_present = true;
2743 }
2744 }
2745
2746 if new_present {
2750 let privilege_map: BTreeMap<_, Vec<_>> =
2752 flat_privileges
2753 .into_iter()
2754 .fold(BTreeMap::new(), |mut accum, privilege| {
2755 accum
2756 .entry((privilege.grantee, privilege.grantor))
2757 .or_default()
2758 .push(privilege);
2759 accum
2760 });
2761
2762 flat_privileges = privilege_map
2764 .into_iter()
2765 .map(|((grantee, grantor), values)|
2766 values.into_iter().fold(
2768 MzAclItem::empty(grantee, grantor),
2769 |mut accum, mz_aclitem| {
2770 accum.acl_mode =
2771 accum.acl_mode.union(mz_aclitem.acl_mode);
2772 accum
2773 },
2774 ))
2775 .collect();
2776 }
2777
2778 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2779 }
2780}
2781
2782fn tx_replace_item(
2791 tx: &mut Transaction<'_>,
2792 state: &CatalogState,
2793 id: CatalogItemId,
2794 new_entry: CatalogEntry,
2795) -> Result<(), AdapterError> {
2796 let new_id = new_entry.id;
2797
2798 for use_id in new_entry.referenced_by() {
2800 if tx.get_item(use_id).is_none() {
2802 continue;
2803 }
2804
2805 let mut dependent = state.get_entry(use_id).clone();
2806 dependent.item = dependent.item.replace_item_refs(id, new_id);
2807 tx.update_item(*use_id, dependent.into())?;
2808 }
2809
2810 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2812 let new_comment_id = new_entry.comment_object_id();
2813 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2814 tx.drop_comments(&[old_comment_id].into())?;
2815 for (sub, comment) in comments {
2816 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2817 }
2818 }
2819
2820 let mz_catalog::durable::Item {
2821 id: _,
2822 oid,
2823 global_id,
2824 schema_id,
2825 name,
2826 create_sql,
2827 owner_id,
2828 privileges,
2829 extra_versions,
2830 } = new_entry.into();
2831
2832 tx.remove_item(id)?;
2833 tx.insert_item(
2834 new_id,
2835 oid,
2836 global_id,
2837 schema_id,
2838 &name,
2839 create_sql,
2840 owner_id,
2841 privileges,
2842 extra_versions,
2843 )?;
2844
2845 Ok(())
2846}
2847
2848fn apply_replacement_audit_events(
2850 state: &CatalogState,
2851 target: &CatalogEntry,
2852 replacement: &CatalogEntry,
2853) -> Vec<(EventType, EventDetails)> {
2854 let mut events = Vec::new();
2855
2856 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2857 let target_id_name = IdFullNameV1 {
2858 id: target.id().to_string(),
2859 name: Catalog::full_name_detail(&target_name),
2860 };
2861 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2862 let replacement_id_name = IdFullNameV1 {
2863 id: replacement.id().to_string(),
2864 name: Catalog::full_name_detail(&replacement_name),
2865 };
2866
2867 if Catalog::should_audit_log_item(&replacement.item) {
2868 events.push((
2869 EventType::Drop,
2870 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2871 ));
2872 }
2873
2874 if Catalog::should_audit_log_item(&target.item) {
2875 events.push((
2876 EventType::Alter,
2877 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2878 target: target_id_name.clone(),
2879 replacement: replacement_id_name,
2880 }),
2881 ));
2882
2883 if let Some(old_cluster_id) = target.cluster_id()
2884 && let Some(new_cluster_id) = replacement.cluster_id()
2885 && old_cluster_id != new_cluster_id
2886 {
2887 events.push((
2890 EventType::Alter,
2891 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2892 id: replacement.id().to_string(),
2893 name: target_id_name.name,
2894 old_cluster_id: old_cluster_id.to_string(),
2895 new_cluster_id: new_cluster_id.to_string(),
2896 }),
2897 ));
2898 }
2899 }
2900
2901 events
2902}
2903
2904#[derive(Debug, Default)]
2912pub(crate) struct ObjectsToDrop {
2913 pub comments: BTreeSet<CommentObjectId>,
2914 pub databases: BTreeSet<DatabaseId>,
2915 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2916 pub clusters: BTreeSet<ClusterId>,
2917 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2918 pub roles: BTreeSet<RoleId>,
2919 pub items: Vec<CatalogItemId>,
2920 pub network_policies: BTreeSet<NetworkPolicyId>,
2921}
2922
2923impl ObjectsToDrop {
2924 pub fn generate(
2925 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2926 state: &CatalogState,
2927 session: Option<&ConnMeta>,
2928 ) -> Result<Self, AdapterError> {
2929 let mut delta = ObjectsToDrop::default();
2930
2931 for drop_object_info in drop_object_infos {
2932 delta.add_item(drop_object_info, state, session)?;
2933 }
2934
2935 Ok(delta)
2936 }
2937
2938 fn add_item(
2939 &mut self,
2940 drop_object_info: DropObjectInfo,
2941 state: &CatalogState,
2942 session: Option<&ConnMeta>,
2943 ) -> Result<(), AdapterError> {
2944 self.comments
2945 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2946
2947 match drop_object_info {
2948 DropObjectInfo::Database(database_id) => {
2949 let database = &state.database_by_id[&database_id];
2950 if database_id.is_system() {
2951 return Err(AdapterError::Catalog(Error::new(
2952 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2953 )));
2954 }
2955
2956 self.databases.insert(database_id);
2957 }
2958 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2959 let schema = state.get_schema(
2960 &database_spec,
2961 &schema_spec,
2962 session
2963 .map(|session| session.conn_id())
2964 .unwrap_or(&SYSTEM_CONN_ID),
2965 );
2966 let schema_id: SchemaId = schema_spec.into();
2967 if schema_id.is_system() {
2968 let name = schema.name();
2969 let full_name = state.resolve_full_schema_name(name);
2970 return Err(AdapterError::Catalog(Error::new(
2971 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2972 )));
2973 }
2974
2975 self.schemas.insert(schema_spec, database_spec);
2976 }
2977 DropObjectInfo::Role(role_id) => {
2978 let name = state.get_role(&role_id).name().to_string();
2979 if role_id.is_system() || role_id.is_predefined() {
2980 return Err(AdapterError::Catalog(Error::new(
2981 ErrorKind::ReservedRoleName(name.clone()),
2982 )));
2983 }
2984 state.ensure_not_reserved_role(&role_id)?;
2985
2986 self.roles.insert(role_id);
2987 }
2988 DropObjectInfo::Cluster(cluster_id) => {
2989 let cluster = state.get_cluster(cluster_id);
2990 let name = &cluster.name;
2991 if cluster_id.is_system() {
2992 return Err(AdapterError::Catalog(Error::new(
2993 ErrorKind::ReadOnlyCluster(name.clone()),
2994 )));
2995 }
2996
2997 self.clusters.insert(cluster_id);
2998 }
2999 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3000 let cluster = state.get_cluster(cluster_id);
3001 let replica = cluster.replica(replica_id).expect("Must exist");
3002
3003 self.replicas
3004 .insert(replica.replica_id, (cluster.id, reason));
3005
3006 let mut seen: BTreeSet<ObjectId> =
3017 self.items.iter().copied().map(ObjectId::Item).collect();
3018 for item_id in &cluster.bound_objects {
3019 let entry = state.get_entry(item_id);
3020 if let CatalogItem::MaterializedView(mv) = entry.item()
3021 && mv.target_replica == Some(replica_id)
3022 && !seen.contains(&ObjectId::Item(*item_id))
3023 {
3024 info!(
3025 "implicitly dropping materialized view {} because target replica was dropped",
3026 entry.name().item,
3027 );
3028 for dep in state.item_dependents(*item_id, &mut seen) {
3029 if let ObjectId::Item(dep_id) = dep {
3030 self.items.push(dep_id);
3031 }
3032 }
3033 }
3034 }
3035 }
3036 DropObjectInfo::Item(item_id) => {
3037 let entry = state.get_entry(&item_id);
3038 if item_id.is_system() {
3039 let name = entry.name();
3040 let full_name =
3041 state.resolve_full_name(name, session.map(|session| session.conn_id()));
3042 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3043 full_name.to_string(),
3044 ))));
3045 }
3046
3047 self.items.push(item_id);
3048 }
3049 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3050 let policy = state.get_network_policy(&network_policy_id);
3051 let name = &policy.name;
3052 if network_policy_id.is_system() {
3053 return Err(AdapterError::Catalog(Error::new(
3054 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3055 )));
3056 }
3057
3058 self.network_policies.insert(network_policy_id);
3059 }
3060 }
3061
3062 Ok(())
3063 }
3064}
3065
3066#[cfg(test)]
3067mod tests {
3068 use mz_catalog::SYSTEM_CONN_ID;
3069 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3070 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3071 use mz_repr::role_id::RoleId;
3072 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3073 use mz_sql::DEFAULT_SCHEMA;
3074 use mz_sql::catalog::CatalogDatabase;
3075 use mz_sql::names::{
3076 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3077 };
3078 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3079
3080 use crate::catalog::{Catalog, Op};
3081 use crate::session::DEFAULT_DATABASE_NAME;
3082
3083 #[mz_ore::test]
3084 fn test_update_privilege_owners() {
3085 let old_owner = RoleId::User(1);
3086 let new_owner = RoleId::User(2);
3087 let other_role = RoleId::User(3);
3088
3089 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3091 MzAclItem {
3092 grantee: other_role,
3093 grantor: old_owner,
3094 acl_mode: AclMode::UPDATE,
3095 },
3096 MzAclItem {
3097 grantee: other_role,
3098 grantor: new_owner,
3099 acl_mode: AclMode::SELECT,
3100 },
3101 ]);
3102 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3103 assert_eq!(1, privileges.all_values().count());
3104 assert_eq!(
3105 vec![MzAclItem {
3106 grantee: other_role,
3107 grantor: new_owner,
3108 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3109 }],
3110 privileges.all_values_owned().collect::<Vec<_>>()
3111 );
3112
3113 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3115 MzAclItem {
3116 grantee: old_owner,
3117 grantor: other_role,
3118 acl_mode: AclMode::UPDATE,
3119 },
3120 MzAclItem {
3121 grantee: new_owner,
3122 grantor: other_role,
3123 acl_mode: AclMode::SELECT,
3124 },
3125 ]);
3126 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3127 assert_eq!(1, privileges.all_values().count());
3128 assert_eq!(
3129 vec![MzAclItem {
3130 grantee: new_owner,
3131 grantor: other_role,
3132 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3133 }],
3134 privileges.all_values_owned().collect::<Vec<_>>()
3135 );
3136
3137 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3139 MzAclItem {
3140 grantee: old_owner,
3141 grantor: old_owner,
3142 acl_mode: AclMode::UPDATE,
3143 },
3144 MzAclItem {
3145 grantee: new_owner,
3146 grantor: new_owner,
3147 acl_mode: AclMode::SELECT,
3148 },
3149 ]);
3150 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3151 assert_eq!(1, privileges.all_values().count());
3152 assert_eq!(
3153 vec![MzAclItem {
3154 grantee: new_owner,
3155 grantor: new_owner,
3156 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3157 }],
3158 privileges.all_values_owned().collect::<Vec<_>>()
3159 );
3160 }
3161
3162 #[mz_ore::test(tokio::test)]
3169 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3171 Catalog::with_debug(|catalog| async move {
3172 let database = catalog
3174 .resolve_database(DEFAULT_DATABASE_NAME)
3175 .expect("default database");
3176 let database_name = database.name.clone();
3177 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3178 let schema = catalog
3179 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3180 .expect("default schema");
3181 let schema_name = schema.name.schema.clone();
3182 let schema_spec = schema.id.clone();
3183
3184 let (id_t1, global_id_t1) = catalog
3186 .allocate_user_id_for_test()
3187 .await
3188 .expect("allocate id for t1");
3189 let (id_t2, global_id_t2) = catalog
3190 .allocate_user_id_for_test()
3191 .await
3192 .expect("allocate id for t2");
3193
3194 let oracle_write_ts = catalog.current_upper().await;
3195
3196 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3198 id,
3199 name: QualifiedItemName {
3200 qualifiers: ItemQualifiers {
3201 database_spec: database_spec.clone(),
3202 schema_spec: schema_spec.clone(),
3203 },
3204 item: name.to_string(),
3205 },
3206 item: CatalogItem::Table(Table {
3207 create_sql: Some(format!(
3208 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3209 )),
3210 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3211 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3212 conn_id: None,
3213 resolved_ids: ResolvedIds::empty(),
3214 custom_logical_compaction_window: None,
3215 is_retained_metrics_object: false,
3216 data_source: TableDataSource::TableWrites { defaults: vec![] },
3217 }),
3218 owner_id: MZ_SYSTEM_ROLE_ID,
3219 };
3220
3221 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3222 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3223
3224 let base_state = catalog.state().clone();
3225
3226 let (state_after_t1, snapshot_after_t1) = catalog
3230 .transact_incremental_dry_run(
3231 &base_state,
3232 vec![op_t1.clone()],
3233 None,
3234 None,
3235 oracle_write_ts,
3236 )
3237 .await
3238 .expect("first dry run");
3239
3240 assert!(
3242 state_after_t1.try_get_entry(&id_t1).is_some(),
3243 "t1 should exist after first dry run"
3244 );
3245 assert_eq!(
3246 state_after_t1
3247 .try_get_entry(&id_t1)
3248 .expect("t1 entry")
3249 .name()
3250 .item,
3251 "t1"
3252 );
3253 assert!(
3254 state_after_t1.try_get_entry(&id_t2).is_none(),
3255 "t2 should NOT exist after first dry run"
3256 );
3257
3258 let (state_incremental, _) = catalog
3260 .transact_incremental_dry_run(
3261 &state_after_t1,
3262 vec![op_t2.clone()],
3263 None,
3264 Some(snapshot_after_t1),
3265 oracle_write_ts,
3266 )
3267 .await
3268 .expect("second dry run");
3269
3270 assert!(
3272 state_incremental.try_get_entry(&id_t1).is_some(),
3273 "t1 should exist in incremental result"
3274 );
3275 assert!(
3276 state_incremental.try_get_entry(&id_t2).is_some(),
3277 "t2 should exist in incremental result"
3278 );
3279
3280 let (state_all_at_once, _) = catalog
3283 .transact_incremental_dry_run(
3284 &base_state,
3285 vec![op_t1.clone(), op_t2.clone()],
3286 None,
3287 None,
3288 oracle_write_ts,
3289 )
3290 .await
3291 .expect("all-at-once dry run");
3292
3293 assert!(
3294 state_all_at_once.try_get_entry(&id_t1).is_some(),
3295 "t1 should exist in all-at-once result"
3296 );
3297 assert!(
3298 state_all_at_once.try_get_entry(&id_t2).is_some(),
3299 "t2 should exist in all-at-once result"
3300 );
3301
3302 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3305 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3306 assert_eq!(inc_t1.name(), all_t1.name());
3307 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3308
3309 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3310 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3311 assert_eq!(inc_t2.name(), all_t2.name());
3312 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3313
3314 catalog.expire().await;
3315 })
3316 .await
3317 }
3318}