1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::optimize::OptimizerFeatures;
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, ColumnName, GlobalId, SqlColumnType, strconv};
47use mz_sql::ast::RawDataType;
48use mz_sql::catalog::{
49 AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
50 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
51 DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
52 RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use serde::{Deserialize, Serialize};
66use tracing::{info, trace};
67
68use crate::AdapterError;
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
72 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
73 is_reserved_role_name, object_type_to_audit_object_type,
74 system_object_type_to_audit_object_type,
75};
76use crate::config::{ScopedParameters, ScopedParametersScope};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88 pub event_type: EventType,
89 pub object_type: ObjectType,
90 pub details: EventDetails,
91 pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96 AlterRetainHistory {
97 id: CatalogItemId,
98 value: Option<Value>,
99 window: CompactionWindow,
100 },
101 AlterSourceTimestampInterval {
102 id: CatalogItemId,
103 value: Option<Value>,
104 interval: Duration,
105 },
106 AlterRole {
107 id: RoleId,
108 name: String,
109 attributes: RoleAttributesRaw,
110 nopassword: bool,
111 vars: RoleVars,
112 },
113 AlterNetworkPolicy {
114 id: NetworkPolicyId,
115 rules: Vec<NetworkPolicyRule>,
116 name: String,
117 owner_id: RoleId,
118 },
119 AlterAddColumn {
120 id: CatalogItemId,
121 new_global_id: GlobalId,
122 name: ColumnName,
123 typ: SqlColumnType,
124 sql: RawDataType,
125 },
126 AlterMaterializedViewApplyReplacement {
127 id: CatalogItemId,
128 replacement_id: CatalogItemId,
129 },
130 CreateDatabase {
131 name: String,
132 owner_id: RoleId,
133 },
134 CreateSchema {
135 database_id: ResolvedDatabaseSpecifier,
136 schema_name: String,
137 owner_id: RoleId,
138 },
139 CreateRole {
140 name: String,
141 attributes: RoleAttributesRaw,
142 },
143 CreateCluster {
144 id: ClusterId,
145 name: String,
146 introspection_sources: Vec<&'static BuiltinLog>,
147 owner_id: RoleId,
148 config: ClusterConfig,
149 },
150 CreateClusterReplica {
151 cluster_id: ClusterId,
152 replica_id: ReplicaId,
153 name: String,
154 config: ReplicaConfig,
155 owner_id: RoleId,
156 reason: ReplicaCreateDropReason,
157 },
158 CreateItem {
159 id: CatalogItemId,
160 name: QualifiedItemName,
161 item: CatalogItem,
162 owner_id: RoleId,
163 },
164 CreateNetworkPolicy {
165 rules: Vec<NetworkPolicyRule>,
166 name: String,
167 owner_id: RoleId,
168 },
169 Comment {
170 object_id: CommentObjectId,
171 sub_component: Option<usize>,
172 comment: Option<String>,
173 },
174 DropObjects(Vec<DropObjectInfo>),
175 GrantRole {
176 role_id: RoleId,
177 member_id: RoleId,
178 grantor_id: RoleId,
179 },
180 RenameCluster {
181 id: ClusterId,
182 name: String,
183 to_name: String,
184 check_reserved_names: bool,
185 },
186 RenameClusterReplica {
187 cluster_id: ClusterId,
188 replica_id: ReplicaId,
189 name: QualifiedReplica,
190 to_name: String,
191 },
192 RenameItem {
193 id: CatalogItemId,
194 current_full_name: FullItemName,
195 to_name: String,
196 },
197 RenameSchema {
198 database_spec: ResolvedDatabaseSpecifier,
199 schema_spec: SchemaSpecifier,
200 new_name: String,
201 check_reserved_names: bool,
202 },
203 UpdateOwner {
204 id: ObjectId,
205 new_owner: RoleId,
206 },
207 UpdatePrivilege {
208 target_id: SystemObjectId,
209 privilege: MzAclItem,
210 variant: UpdatePrivilegeVariant,
211 },
212 UpdateDefaultPrivilege {
213 privilege_object: DefaultPrivilegeObject,
214 privilege_acl_item: DefaultPrivilegeAclItem,
215 variant: UpdatePrivilegeVariant,
216 },
217 RevokeRole {
218 role_id: RoleId,
219 member_id: RoleId,
220 grantor_id: RoleId,
221 },
222 UpdateClusterConfig {
223 id: ClusterId,
224 name: String,
225 config: ClusterConfig,
226 },
227 UpdateClusterReplicaConfig {
228 cluster_id: ClusterId,
229 replica_id: ReplicaId,
230 config: ReplicaConfig,
231 },
232 UpdateItem {
233 id: CatalogItemId,
234 name: QualifiedItemName,
235 to_item: CatalogItem,
236 },
237 UpdateSourceReferences {
238 source_id: CatalogItemId,
239 references: SourceReferences,
240 },
241 UpdateSystemConfiguration {
242 name: String,
243 value: OwnedVarInput,
244 },
245 ResetSystemConfiguration {
246 name: String,
247 },
248 ResetAllSystemConfiguration,
249 UpdateScopedSystemParameters {
261 scoped: ScopedParameters,
262 prune_scope: Option<ScopedParametersScope>,
263 },
264 InjectAuditEvents {
270 events: Vec<InjectedAuditEvent>,
271 },
272}
273
274#[derive(Debug, Clone)]
278pub enum DropObjectInfo {
279 Cluster(ClusterId),
280 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
281 Database(DatabaseId),
282 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
283 Role(RoleId),
284 Item(CatalogItemId),
285 NetworkPolicy(NetworkPolicyId),
286}
287
288impl DropObjectInfo {
289 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
292 match id {
293 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
294 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
295 cluster_id,
296 replica_id,
297 ReplicaCreateDropReason::Manual,
298 )),
299 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
300 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
301 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
302 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
303 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
304 }
305 }
306
307 fn to_object_id(&self) -> ObjectId {
310 match &self {
311 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
312 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
313 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
314 }
315 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
316 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
317 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
318 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
319 DropObjectInfo::NetworkPolicy(network_policy_id) => {
320 ObjectId::NetworkPolicy(network_policy_id.clone())
321 }
322 }
323 }
324}
325
326#[derive(Debug, Clone)]
328pub enum ReplicaCreateDropReason {
329 Manual,
334 ClusterScheduling(Vec<SchedulingDecision>),
337}
338
339impl ReplicaCreateDropReason {
340 pub fn into_audit_log(
341 self,
342 ) -> (
343 CreateOrDropClusterReplicaReasonV1,
344 Option<SchedulingDecisionsWithReasonsV2>,
345 ) {
346 let (reason, scheduling_policies) = match self {
347 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
348 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
349 CreateOrDropClusterReplicaReasonV1::Schedule,
350 Some(scheduling_decisions),
351 ),
352 };
353 (
354 reason,
355 scheduling_policies
356 .as_ref()
357 .map(SchedulingDecision::reasons_to_audit_log_reasons),
358 )
359 }
360}
361
362pub struct TransactionResult {
363 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
364 pub catalog_updates: Vec<ParsedStateUpdate>,
366 pub audit_events: Vec<VersionedEvent>,
367}
368
369#[derive(Debug, Clone, Copy)]
370enum TransactInnerMode {
371 Commit,
375 DryRun,
382}
383
384impl Catalog {
385 fn should_audit_log_item(item: &CatalogItem) -> bool {
386 !item.is_temporary()
387 }
388
389 fn temporary_ids(
392 &self,
393 ops: &[Op],
394 temporary_drops: BTreeSet<(&ConnectionId, String)>,
395 ) -> Result<BTreeSet<CatalogItemId>, Error> {
396 let mut creating = BTreeSet::new();
397 let mut temporary_ids = BTreeSet::new();
398 for op in ops.iter() {
399 if let Op::CreateItem {
400 id,
401 name,
402 item,
403 owner_id: _,
404 } = op
405 {
406 if let Some(conn_id) = item.conn_id() {
407 if self.item_exists_in_temp_schemas(conn_id, &name.item)
408 && !temporary_drops.contains(&(conn_id, name.item.clone()))
409 || creating.contains(&(conn_id, &name.item))
410 {
411 return Err(
412 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
413 );
414 } else {
415 creating.insert((conn_id, &name.item));
416 temporary_ids.insert(id.clone());
417 }
418 }
419 }
420 }
421 Ok(temporary_ids)
422 }
423
424 #[instrument(name = "catalog::transact")]
425 pub async fn transact(
426 &mut self,
427 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
430 oracle_write_ts: mz_repr::Timestamp,
431 session: Option<&ConnMeta>,
432 ops: Vec<Op>,
433 ) -> Result<TransactionResult, AdapterError> {
434 trace!("transact: {:?}", ops);
435 fail::fail_point!("catalog_transact", |arg| {
436 Err(AdapterError::Unstructured(anyhow::anyhow!(
437 "failpoint: {arg:?}"
438 )))
439 });
440
441 let drop_ids: BTreeSet<CatalogItemId> = ops
442 .iter()
443 .filter_map(|op| match op {
444 Op::DropObjects(drop_object_infos) => {
445 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
446 let item_ids = ids.filter_map(|id| match id {
447 ObjectId::Item(id) => Some(id),
448 _ => None,
449 });
450 Some(item_ids)
451 }
452 _ => None,
453 })
454 .flatten()
455 .collect();
456 let temporary_drops = drop_ids
457 .iter()
458 .filter_map(|id| {
459 let entry = self.get_entry(id);
460 match entry.item().conn_id() {
461 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
462 None => None,
463 }
464 })
465 .collect();
466
467 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
468 let mut builtin_table_updates = vec![];
469 let mut catalog_updates = vec![];
470 let mut audit_events = vec![];
471 let mut storage = self.storage().await;
472 let mut tx = storage
473 .transaction()
474 .await
475 .unwrap_or_terminate("starting catalog transaction");
476
477 let new_state = Self::transact_inner(
478 TransactInnerMode::Commit,
479 storage_collections,
480 oracle_write_ts,
481 session,
482 ops,
483 temporary_ids,
484 &mut builtin_table_updates,
485 &mut catalog_updates,
486 &mut audit_events,
487 &mut tx,
488 &self.state,
489 )
490 .await?;
491
492 tx.commit(oracle_write_ts)
497 .await
498 .unwrap_or_terminate("catalog storage transaction commit must succeed");
499
500 drop(storage);
503 if let Some(new_state) = new_state {
504 self.transient_revision += 1;
505 self.state = new_state;
506 }
507
508 Ok(TransactionResult {
509 builtin_table_updates,
510 catalog_updates,
511 audit_events,
512 })
513 }
514
515 pub async fn transact_incremental_dry_run(
530 &self,
531 base_state: &CatalogState,
532 ops: Vec<Op>,
533 session: Option<&ConnMeta>,
534 prev_snapshot: Option<Snapshot>,
535 oracle_write_ts: mz_repr::Timestamp,
536 ) -> Result<(CatalogState, Snapshot), AdapterError> {
537 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
540
541 let mut builtin_table_updates = vec![];
542 let mut catalog_updates = vec![];
543 let mut audit_events = vec![];
544 let mut storage = self.storage().await;
545 let mut tx = if let Some(snapshot) = prev_snapshot {
546 storage
549 .transaction_from_snapshot(snapshot)
550 .unwrap_or_terminate("starting catalog transaction from snapshot")
551 } else {
552 storage
555 .transaction()
556 .await
557 .unwrap_or_terminate("starting catalog transaction")
558 };
559
560 let new_state = Self::transact_inner(
562 TransactInnerMode::DryRun,
563 None,
564 oracle_write_ts,
565 session,
566 ops,
567 temporary_ids,
568 &mut builtin_table_updates,
569 &mut catalog_updates,
570 &mut audit_events,
571 &mut tx,
572 base_state,
573 )
574 .await?;
575
576 let new_snapshot = tx.current_snapshot();
579
580 drop(storage);
582
583 let state = new_state.unwrap_or_else(|| base_state.clone());
585 Ok((state, new_snapshot))
586 }
587
588 fn extract_expressions_from_ops(
592 ops: &[Op],
593 optimizer_features: &OptimizerFeatures,
594 ) -> BTreeMap<GlobalId, LocalExpressions> {
595 let mut exprs = BTreeMap::new();
596
597 for op in ops {
598 if let Op::CreateItem { item, .. } = op {
599 match item {
600 CatalogItem::View(view) => {
601 exprs.insert(
602 view.global_id,
603 LocalExpressions {
604 local_mir: (*view.locally_optimized_expr).clone(),
605 optimizer_features: optimizer_features.clone(),
606 },
607 );
608 }
609 CatalogItem::MaterializedView(mv) => {
610 exprs.insert(
611 mv.global_id_writes(),
612 LocalExpressions {
613 local_mir: (*mv.locally_optimized_expr).clone(),
614 optimizer_features: optimizer_features.clone(),
615 },
616 );
617 }
618 CatalogItem::Table(_)
619 | CatalogItem::Source(_)
620 | CatalogItem::Log(_)
621 | CatalogItem::Sink(_)
622 | CatalogItem::Index(_)
623 | CatalogItem::Type(_)
624 | CatalogItem::Func(_)
625 | CatalogItem::Secret(_)
626 | CatalogItem::Connection(_) => {}
627 }
628 }
629 }
630
631 exprs
632 }
633
634 #[instrument(name = "catalog::transact_inner")]
642 async fn transact_inner(
643 mode: TransactInnerMode,
644 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
645 oracle_write_ts: mz_repr::Timestamp,
646 session: Option<&ConnMeta>,
647 ops: Vec<Op>,
648 temporary_ids: BTreeSet<CatalogItemId>,
649 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
650 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
651 audit_events: &mut Vec<VersionedEvent>,
652 tx: &mut Transaction<'_>,
653 state: &CatalogState,
654 ) -> Result<Option<CatalogState>, AdapterError> {
655 let mut preliminary_state = Cow::Borrowed(state);
682
683 let mut state = Cow::Borrowed(state);
685
686 if ops.is_empty() {
687 return Ok(None);
688 }
689
690 let optimizer_features = OptimizerFeatures::from(state.system_config());
693 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
694
695 let mut storage_collections_to_create = BTreeSet::new();
696 let mut storage_collections_to_drop = BTreeSet::new();
697 let mut storage_collections_to_register = BTreeMap::new();
698
699 let mut updates = Vec::new();
700
701 for op in ops {
702 let temporary_item_updates = Self::transact_op(
703 oracle_write_ts,
704 session,
705 op,
706 &temporary_ids,
707 audit_events,
708 tx,
709 &*preliminary_state,
710 &mut storage_collections_to_create,
711 &mut storage_collections_to_drop,
712 &mut storage_collections_to_register,
713 )
714 .await?;
715
716 let upper = tx.upper();
721 let temporary_item_updates =
722 temporary_item_updates
723 .into_iter()
724 .map(|(item, diff)| StateUpdate {
725 kind: StateUpdateKind::TemporaryItem(item),
726 ts: upper,
727 diff,
728 });
729
730 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
731 op_updates.extend(temporary_item_updates);
732 if !op_updates.is_empty() {
733 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
736 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
737 .to_mut()
738 .apply_updates(op_updates.clone(), &mut local_expr_cache)
739 .await;
740 }
741 updates.append(&mut op_updates);
742 }
743
744 if !updates.is_empty() {
745 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
746 let (op_builtin_table_updates, op_catalog_updates) = state
747 .to_mut()
748 .apply_updates(updates.clone(), &mut local_expr_cache)
749 .await;
750 let op_builtin_table_updates = state
751 .to_mut()
752 .resolve_builtin_table_updates(op_builtin_table_updates);
753 builtin_table_updates.extend(op_builtin_table_updates);
754 parsed_catalog_updates.extend(op_catalog_updates);
755 }
756
757 match mode {
758 TransactInnerMode::Commit => {
759 if let Some(c) = storage_collections {
761 c.prepare_state(
762 tx,
763 storage_collections_to_create,
764 storage_collections_to_drop,
765 storage_collections_to_register,
766 )
767 .await?;
768 }
769 }
770 TransactInnerMode::DryRun => {
771 debug_assert!(
772 storage_collections.is_none(),
773 "dry-run mode must not prepare storage state"
774 );
775 }
776 }
777
778 let updates = tx.get_and_commit_op_updates();
779 if !updates.is_empty() {
780 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
781 let (op_builtin_table_updates, op_catalog_updates) = state
782 .to_mut()
783 .apply_updates(updates.clone(), &mut local_expr_cache)
784 .await;
785 let op_builtin_table_updates = state
786 .to_mut()
787 .resolve_builtin_table_updates(op_builtin_table_updates);
788 builtin_table_updates.extend(op_builtin_table_updates);
789 parsed_catalog_updates.extend(op_catalog_updates);
790 }
791
792 match state {
793 Cow::Owned(state) => Ok(Some(state)),
794 Cow::Borrowed(_) => Ok(None),
795 }
796 }
797
798 #[instrument]
806 async fn transact_op(
807 oracle_write_ts: mz_repr::Timestamp,
808 session: Option<&ConnMeta>,
809 op: Op,
810 temporary_ids: &BTreeSet<CatalogItemId>,
811 audit_events: &mut Vec<VersionedEvent>,
812 tx: &mut Transaction<'_>,
813 state: &CatalogState,
814 storage_collections_to_create: &mut BTreeSet<GlobalId>,
815 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
816 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
817 ) -> Result<Vec<(TemporaryItem, StateDiff)>, AdapterError> {
818 let mut temporary_item_updates = Vec::new();
819
820 match op {
821 Op::AlterRetainHistory { id, value, window } => {
822 let entry = state.get_entry(&id);
823 if id.is_system() {
824 let name = entry.name();
825 let full_name =
826 state.resolve_full_name(name, session.map(|session| session.conn_id()));
827 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
828 full_name.to_string(),
829 ))));
830 }
831
832 let mut new_entry = entry.clone();
833 let previous = new_entry
834 .item
835 .update_retain_history(value.clone(), window)
836 .map_err(|_| {
837 AdapterError::Catalog(Error::new(ErrorKind::Internal(
838 "planner should have rejected invalid alter retain history item type"
839 .to_string(),
840 )))
841 })?;
842
843 if Self::should_audit_log_item(new_entry.item()) {
844 let details =
845 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
846 id: id.to_string(),
847 old_history: previous.map(|previous| previous.to_string()),
848 new_history: value.map(|v| v.to_string()),
849 });
850 CatalogState::add_to_audit_log(
851 &state.system_configuration,
852 oracle_write_ts,
853 session,
854 tx,
855 audit_events,
856 EventType::Alter,
857 catalog_type_to_audit_object_type(new_entry.item().typ()),
858 details,
859 )?;
860 }
861
862 tx.update_item(id, new_entry.into())?;
863
864 Self::log_update(state, &id);
865 }
866 Op::AlterSourceTimestampInterval {
867 id,
868 value,
869 interval,
870 } => {
871 let entry = state.get_entry(&id);
872 if id.is_system() {
873 let name = entry.name();
874 let full_name =
875 state.resolve_full_name(name, session.map(|session| session.conn_id()));
876 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
877 full_name.to_string(),
878 ))));
879 }
880
881 let mut new_entry = entry.clone();
882 let previous = new_entry
883 .item
884 .update_timestamp_interval(value.clone(), interval)
885 .map_err(|_| {
886 AdapterError::Catalog(Error::new(ErrorKind::Internal(
887 "planner should have rejected invalid alter timestamp interval item type"
888 .to_string(),
889 )))
890 })?;
891
892 if Self::should_audit_log_item(new_entry.item()) {
893 let details = EventDetails::AlterSourceTimestampIntervalV1(
894 mz_audit_log::AlterSourceTimestampIntervalV1 {
895 id: id.to_string(),
896 old_interval: previous.map(|previous| previous.to_string()),
897 new_interval: value.map(|v| v.to_string()),
898 },
899 );
900 CatalogState::add_to_audit_log(
901 &state.system_configuration,
902 oracle_write_ts,
903 session,
904 tx,
905 audit_events,
906 EventType::Alter,
907 catalog_type_to_audit_object_type(new_entry.item().typ()),
908 details,
909 )?;
910 }
911
912 tx.update_item(id, new_entry.into())?;
913
914 Self::log_update(state, &id);
915 }
916 Op::AlterRole {
917 id,
918 name,
919 attributes,
920 nopassword,
921 vars,
922 } => {
923 state.ensure_not_reserved_role(&id)?;
924
925 let mut existing_role = state.get_role(&id).clone();
926 let password = attributes.password.clone();
927 let scram_iterations = attributes
928 .scram_iterations
929 .unwrap_or_else(|| state.system_config().scram_iterations());
930 existing_role.attributes = attributes.into();
931 existing_role.vars = vars;
932 let password_action = if nopassword {
933 PasswordAction::Clear
934 } else if let Some(password) = password {
935 PasswordAction::Set(PasswordConfig {
936 password,
937 scram_iterations,
938 })
939 } else {
940 PasswordAction::NoChange
941 };
942 tx.update_role(id, existing_role.into(), password_action)?;
943
944 CatalogState::add_to_audit_log(
945 &state.system_configuration,
946 oracle_write_ts,
947 session,
948 tx,
949 audit_events,
950 EventType::Alter,
951 ObjectType::Role,
952 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
953 id: id.to_string(),
954 name: name.clone(),
955 }),
956 )?;
957
958 info!("update role {name} ({id})");
959 }
960 Op::AlterNetworkPolicy {
961 id,
962 rules,
963 name,
964 owner_id: _owner_id,
965 } => {
966 let existing_policy = state.get_network_policy(&id).clone();
967 let mut policy: NetworkPolicy = existing_policy.into();
968 policy.rules = rules;
969 if is_reserved_name(&name) {
970 return Err(AdapterError::Catalog(Error::new(
971 ErrorKind::ReservedNetworkPolicyName(name),
972 )));
973 }
974 tx.update_network_policy(id, policy.clone())?;
975
976 CatalogState::add_to_audit_log(
977 &state.system_configuration,
978 oracle_write_ts,
979 session,
980 tx,
981 audit_events,
982 EventType::Alter,
983 ObjectType::NetworkPolicy,
984 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
985 id: id.to_string(),
986 name: name.clone(),
987 }),
988 )?;
989
990 info!("update network policy {name} ({id})");
991 }
992 Op::AlterAddColumn {
993 id,
994 new_global_id,
995 name,
996 typ,
997 sql,
998 } => {
999 let column_name = name.to_string();
1000 let column_type = typ.to_string();
1001 let nullable = typ.nullable;
1002 let mut new_entry = state.get_entry(&id).clone();
1003 let version = new_entry.item.add_column(name, typ, sql)?;
1004 let shard_id = state
1007 .storage_metadata()
1008 .get_collection_shard(new_entry.latest_global_id())?;
1009
1010 let CatalogItem::Table(table) = &mut new_entry.item else {
1012 return Err(AdapterError::Unsupported("adding columns to non-Table"));
1013 };
1014 table.collections.insert(version, new_global_id);
1015
1016 if Self::should_audit_log_item(new_entry.item()) {
1017 let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
1018 id: id.to_string(),
1019 column: column_name,
1020 column_type,
1021 nullable,
1022 });
1023 CatalogState::add_to_audit_log(
1024 &state.system_configuration,
1025 oracle_write_ts,
1026 session,
1027 tx,
1028 audit_events,
1029 EventType::Alter,
1030 catalog_type_to_audit_object_type(new_entry.item().typ()),
1031 details,
1032 )?;
1033 }
1034
1035 tx.update_item(id, new_entry.into())?;
1036 storage_collections_to_register.insert(new_global_id, shard_id);
1037 }
1038 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1039 let mut new_entry = state.get_entry(&id).clone();
1040 let replacement = state.get_entry(&replacement_id);
1041
1042 let new_audit_events =
1043 apply_replacement_audit_events(state, &new_entry, replacement);
1044
1045 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1046 return Err(AdapterError::internal(
1047 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1048 "id must refer to a materialized view",
1049 ));
1050 };
1051 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1052 return Err(AdapterError::internal(
1053 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1054 "replacement_id must refer to a materialized view",
1055 ));
1056 };
1057
1058 mv.apply_replacement(replacement_mv.clone());
1059
1060 tx.remove_item(replacement_id)?;
1061
1062 new_entry.id = replacement_id;
1063 tx_replace_item(tx, state, id, new_entry)?;
1064
1065 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1066 tx.drop_comments(&[comment_id].into())?;
1067
1068 for (event_type, details) in new_audit_events {
1069 CatalogState::add_to_audit_log(
1070 &state.system_configuration,
1071 oracle_write_ts,
1072 session,
1073 tx,
1074 audit_events,
1075 event_type,
1076 ObjectType::MaterializedView,
1077 details,
1078 )?;
1079 }
1080 }
1081 Op::CreateDatabase { name, owner_id } => {
1082 let database_owner_privileges = vec![rbac::owner_privilege(
1083 mz_sql::catalog::ObjectType::Database,
1084 owner_id,
1085 )];
1086 let database_default_privileges = state
1087 .default_privileges
1088 .get_applicable_privileges(
1089 owner_id,
1090 None,
1091 None,
1092 mz_sql::catalog::ObjectType::Database,
1093 )
1094 .map(|item| item.mz_acl_item(owner_id));
1095 let database_privileges: Vec<_> = merge_mz_acl_items(
1096 database_owner_privileges
1097 .into_iter()
1098 .chain(database_default_privileges),
1099 )
1100 .collect();
1101
1102 let schema_owner_privileges = vec![rbac::owner_privilege(
1103 mz_sql::catalog::ObjectType::Schema,
1104 owner_id,
1105 )];
1106 let schema_default_privileges = state
1107 .default_privileges
1108 .get_applicable_privileges(
1109 owner_id,
1110 None,
1111 None,
1112 mz_sql::catalog::ObjectType::Schema,
1113 )
1114 .map(|item| item.mz_acl_item(owner_id))
1115 .chain(std::iter::once(MzAclItem {
1117 grantee: RoleId::Public,
1118 grantor: owner_id,
1119 acl_mode: AclMode::USAGE,
1120 }));
1121 let schema_privileges: Vec<_> = merge_mz_acl_items(
1122 schema_owner_privileges
1123 .into_iter()
1124 .chain(schema_default_privileges),
1125 )
1126 .collect();
1127
1128 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1129 let (database_id, _) = tx.insert_user_database(
1130 &name,
1131 owner_id,
1132 database_privileges.clone(),
1133 &temporary_oids,
1134 )?;
1135 let (schema_id, _) = tx.insert_user_schema(
1136 database_id,
1137 DEFAULT_SCHEMA,
1138 owner_id,
1139 schema_privileges.clone(),
1140 &temporary_oids,
1141 )?;
1142 CatalogState::add_to_audit_log(
1143 &state.system_configuration,
1144 oracle_write_ts,
1145 session,
1146 tx,
1147 audit_events,
1148 EventType::Create,
1149 ObjectType::Database,
1150 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1151 id: database_id.to_string(),
1152 name: name.clone(),
1153 }),
1154 )?;
1155 info!("create database {}", name);
1156
1157 CatalogState::add_to_audit_log(
1158 &state.system_configuration,
1159 oracle_write_ts,
1160 session,
1161 tx,
1162 audit_events,
1163 EventType::Create,
1164 ObjectType::Schema,
1165 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1166 id: schema_id.to_string(),
1167 name: DEFAULT_SCHEMA.to_string(),
1168 database_name: Some(name),
1169 }),
1170 )?;
1171 }
1172 Op::CreateSchema {
1173 database_id,
1174 schema_name,
1175 owner_id,
1176 } => {
1177 if is_reserved_name(&schema_name) {
1178 return Err(AdapterError::Catalog(Error::new(
1179 ErrorKind::ReservedSchemaName(schema_name),
1180 )));
1181 }
1182 let database_id = match database_id {
1183 ResolvedDatabaseSpecifier::Id(id) => id,
1184 ResolvedDatabaseSpecifier::Ambient => {
1185 return Err(AdapterError::Catalog(Error::new(
1186 ErrorKind::ReadOnlySystemSchema(schema_name),
1187 )));
1188 }
1189 };
1190 let owner_privileges = vec![rbac::owner_privilege(
1191 mz_sql::catalog::ObjectType::Schema,
1192 owner_id,
1193 )];
1194 let default_privileges = state
1195 .default_privileges
1196 .get_applicable_privileges(
1197 owner_id,
1198 Some(database_id),
1199 None,
1200 mz_sql::catalog::ObjectType::Schema,
1201 )
1202 .map(|item| item.mz_acl_item(owner_id));
1203 let privileges: Vec<_> =
1204 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1205 .collect();
1206 let (schema_id, _) = tx.insert_user_schema(
1207 database_id,
1208 &schema_name,
1209 owner_id,
1210 privileges.clone(),
1211 &state.get_temporary_oids().collect(),
1212 )?;
1213 CatalogState::add_to_audit_log(
1214 &state.system_configuration,
1215 oracle_write_ts,
1216 session,
1217 tx,
1218 audit_events,
1219 EventType::Create,
1220 ObjectType::Schema,
1221 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1222 id: schema_id.to_string(),
1223 name: schema_name.clone(),
1224 database_name: Some(state.database_by_id[&database_id].name.clone()),
1225 }),
1226 )?;
1227 }
1228 Op::CreateRole { name, attributes } => {
1229 if is_reserved_role_name(&name) {
1230 return Err(AdapterError::Catalog(Error::new(
1231 ErrorKind::ReservedRoleName(name),
1232 )));
1233 }
1234 let membership = RoleMembership::new();
1235 let vars = RoleVars::default();
1236 let (id, _) = tx.insert_user_role(
1237 name.clone(),
1238 attributes.clone(),
1239 membership.clone(),
1240 vars.clone(),
1241 &state.get_temporary_oids().collect(),
1242 )?;
1243 CatalogState::add_to_audit_log(
1244 &state.system_configuration,
1245 oracle_write_ts,
1246 session,
1247 tx,
1248 audit_events,
1249 EventType::Create,
1250 ObjectType::Role,
1251 EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1252 id: id.to_string(),
1253 name: name.clone(),
1254 auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1255 AutoProvisionSource::Oidc => "oidc".to_string(),
1256 AutoProvisionSource::Frontegg => "frontegg".to_string(),
1257 AutoProvisionSource::None => "none".to_string(),
1258 }),
1259 }),
1260 )?;
1261 info!("create role {}", name);
1262 }
1263 Op::CreateCluster {
1264 id,
1265 name,
1266 introspection_sources,
1267 owner_id,
1268 config,
1269 } => {
1270 if is_reserved_name(&name) {
1271 return Err(AdapterError::Catalog(Error::new(
1272 ErrorKind::ReservedClusterName(name),
1273 )));
1274 }
1275 let owner_privileges = vec![rbac::owner_privilege(
1276 mz_sql::catalog::ObjectType::Cluster,
1277 owner_id,
1278 )];
1279 let default_privileges = state
1280 .default_privileges
1281 .get_applicable_privileges(
1282 owner_id,
1283 None,
1284 None,
1285 mz_sql::catalog::ObjectType::Cluster,
1286 )
1287 .map(|item| item.mz_acl_item(owner_id));
1288 let privileges: Vec<_> =
1289 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1290 .collect();
1291 let introspection_source_ids: Vec<_> = introspection_sources
1292 .iter()
1293 .map(|introspection_source| {
1294 Transaction::allocate_introspection_source_index_id(
1295 &id,
1296 introspection_source.variant,
1297 )
1298 })
1299 .collect();
1300
1301 let introspection_sources = introspection_sources
1302 .into_iter()
1303 .zip_eq(introspection_source_ids)
1304 .map(|(log, (item_id, gid))| (log, item_id, gid))
1305 .collect();
1306
1307 tx.insert_user_cluster(
1308 id,
1309 &name,
1310 introspection_sources,
1311 owner_id,
1312 privileges.clone(),
1313 config.clone().into(),
1314 &state.get_temporary_oids().collect(),
1315 )?;
1316 CatalogState::add_to_audit_log(
1317 &state.system_configuration,
1318 oracle_write_ts,
1319 session,
1320 tx,
1321 audit_events,
1322 EventType::Create,
1323 ObjectType::Cluster,
1324 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1325 id: id.to_string(),
1326 name: name.clone(),
1327 }),
1328 )?;
1329 info!("create cluster {}", name);
1330 }
1331 Op::CreateClusterReplica {
1332 cluster_id,
1333 replica_id,
1334 name,
1335 config,
1336 owner_id,
1337 reason,
1338 } => {
1339 if is_reserved_name(&name) {
1340 return Err(AdapterError::Catalog(Error::new(
1341 ErrorKind::ReservedReplicaName(name),
1342 )));
1343 }
1344 let cluster = state.get_cluster(cluster_id);
1345 tx.insert_cluster_replica_with_id(
1349 cluster_id,
1350 replica_id,
1351 &name,
1352 config.clone().into(),
1353 owner_id,
1354 )?;
1355 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1356 size,
1357 billed_as,
1358 internal,
1359 ..
1360 }) = &config.location
1361 {
1362 let (reason, scheduling_policies) = reason.into_audit_log();
1363 let details = EventDetails::CreateClusterReplicaV4(
1364 mz_audit_log::CreateClusterReplicaV4 {
1365 cluster_id: cluster_id.to_string(),
1366 cluster_name: cluster.name.clone(),
1367 replica_id: Some(replica_id.to_string()),
1368 replica_name: name.clone(),
1369 logical_size: size.clone(),
1370 billed_as: billed_as.clone(),
1371 internal: *internal,
1372 reason,
1373 scheduling_policies,
1374 },
1375 );
1376 CatalogState::add_to_audit_log(
1377 &state.system_configuration,
1378 oracle_write_ts,
1379 session,
1380 tx,
1381 audit_events,
1382 EventType::Create,
1383 ObjectType::ClusterReplica,
1384 details,
1385 )?;
1386 }
1387 }
1388 Op::CreateItem {
1389 id,
1390 name,
1391 item,
1392 owner_id,
1393 } => {
1394 state.check_unstable_dependencies(&item)?;
1395
1396 match &item {
1397 CatalogItem::Table(table) => {
1398 let gids: Vec<_> = table.global_ids().collect();
1399 assert_eq!(gids.len(), 1);
1400 storage_collections_to_create.extend(gids);
1401 }
1402 CatalogItem::Source(source) => {
1403 storage_collections_to_create.insert(source.global_id());
1404 }
1405 CatalogItem::MaterializedView(mv) => {
1406 let mv_gid = mv.global_id_writes();
1407 if let Some(target_id) = mv.replacement_target {
1408 let target_gid = state.get_entry(&target_id).latest_global_id();
1409 let shard_id =
1410 state.storage_metadata().get_collection_shard(target_gid)?;
1411 storage_collections_to_register.insert(mv_gid, shard_id);
1412 } else {
1413 storage_collections_to_create.insert(mv_gid);
1414 }
1415 }
1416 CatalogItem::Sink(sink) => {
1417 storage_collections_to_create.insert(sink.global_id());
1418 }
1419 CatalogItem::Log(_)
1420 | CatalogItem::View(_)
1421 | CatalogItem::Index(_)
1422 | CatalogItem::Type(_)
1423 | CatalogItem::Func(_)
1424 | CatalogItem::Secret(_)
1425 | CatalogItem::Connection(_) => (),
1426 }
1427
1428 let system_user = session.map_or(false, |s| s.user().is_system_user());
1429 if !system_user {
1430 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1431 let cluster_name = state.clusters_by_id[&id].name.clone();
1432 return Err(AdapterError::Catalog(Error::new(
1433 ErrorKind::ReadOnlyCluster(cluster_name),
1434 )));
1435 }
1436 }
1437
1438 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1439 let default_privileges = state
1440 .default_privileges
1441 .get_applicable_privileges(
1442 owner_id,
1443 name.qualifiers.database_spec.id(),
1444 Some(name.qualifiers.schema_spec.into()),
1445 item.typ().into(),
1446 )
1447 .map(|item| item.mz_acl_item(owner_id));
1448 let progress_source_privilege = if item.is_progress_source() {
1450 Some(MzAclItem {
1451 grantee: MZ_SUPPORT_ROLE_ID,
1452 grantor: owner_id,
1453 acl_mode: AclMode::SELECT,
1454 })
1455 } else {
1456 None
1457 };
1458 let privileges: Vec<_> = merge_mz_acl_items(
1459 owner_privileges
1460 .into_iter()
1461 .chain(default_privileges)
1462 .chain(progress_source_privilege),
1463 )
1464 .collect();
1465
1466 let temporary_oids = state.get_temporary_oids().collect();
1467
1468 if item.is_temporary() {
1469 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1470 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1471 {
1472 return Err(AdapterError::Catalog(Error::new(
1473 ErrorKind::InvalidTemporarySchema,
1474 )));
1475 }
1476 let oid = tx.allocate_oid(&temporary_oids)?;
1477
1478 let schema_id = name.qualifiers.schema_spec.clone().into();
1479 let item_type = item.typ();
1480 let (create_sql, global_id, versions) = item.to_serialized();
1481
1482 let item = TemporaryItem {
1483 id,
1484 oid,
1485 global_id,
1486 schema_id,
1487 name: name.item.clone(),
1488 create_sql,
1489 conn_id: item.conn_id().cloned(),
1490 owner_id,
1491 privileges: privileges.clone(),
1492 extra_versions: versions,
1493 };
1494 temporary_item_updates.push((item, StateDiff::Addition));
1495
1496 info!(
1497 "create temporary {} {} ({})",
1498 item_type,
1499 state.resolve_full_name(&name, None),
1500 id
1501 );
1502 } else {
1503 if let Some(temp_id) =
1504 item.uses()
1505 .iter()
1506 .find(|id| match state.try_get_entry(*id) {
1507 Some(entry) => entry.item().is_temporary(),
1508 None => temporary_ids.contains(id),
1509 })
1510 {
1511 let temp_item = state.get_entry(temp_id);
1512 return Err(AdapterError::Catalog(Error::new(
1513 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1514 )));
1515 }
1516 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1517 && !system_user
1518 {
1519 let schema_name = state
1520 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1521 .schema;
1522 return Err(AdapterError::Catalog(Error::new(
1523 ErrorKind::ReadOnlySystemSchema(schema_name),
1524 )));
1525 }
1526 let schema_id = name.qualifiers.schema_spec.clone().into();
1527 let item_type = item.typ();
1528 let (create_sql, global_id, versions) = item.to_serialized();
1529 tx.insert_user_item(
1530 id,
1531 global_id,
1532 schema_id,
1533 &name.item,
1534 create_sql,
1535 owner_id,
1536 privileges.clone(),
1537 &temporary_oids,
1538 versions,
1539 )?;
1540 info!(
1541 "create {} {} ({})",
1542 item_type,
1543 state.resolve_full_name(&name, None),
1544 id
1545 );
1546 }
1547
1548 if Self::should_audit_log_item(&item) {
1549 let name = Self::full_name_detail(
1550 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1551 );
1552 let details = match &item {
1553 CatalogItem::Source(s) => {
1554 let cluster_id = match s.data_source {
1555 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1558 match state.get_entry(&ingestion_id).cluster_id() {
1559 Some(cluster_id) => Some(cluster_id.to_string()),
1560 None => None,
1561 }
1562 }
1563 _ => match item.cluster_id() {
1564 Some(cluster_id) => Some(cluster_id.to_string()),
1565 None => None,
1566 },
1567 };
1568
1569 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1570 id: id.to_string(),
1571 cluster_id,
1572 name,
1573 external_type: s.source_type().to_string(),
1574 })
1575 }
1576 CatalogItem::Sink(s) => {
1577 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1578 id: id.to_string(),
1579 cluster_id: Some(s.cluster_id.to_string()),
1580 name,
1581 external_type: s.sink_type().to_string(),
1582 })
1583 }
1584 CatalogItem::Index(i) => {
1585 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1586 id: id.to_string(),
1587 name,
1588 cluster_id: i.cluster_id.to_string(),
1589 })
1590 }
1591 CatalogItem::MaterializedView(mv) => {
1592 EventDetails::CreateMaterializedViewV1(
1593 mz_audit_log::CreateMaterializedViewV1 {
1594 id: id.to_string(),
1595 name,
1596 cluster_id: mv.cluster_id.to_string(),
1597 replacement_target_id: mv
1598 .replacement_target
1599 .map(|id| id.to_string()),
1600 },
1601 )
1602 }
1603 CatalogItem::Table(_)
1604 | CatalogItem::Log(_)
1605 | CatalogItem::View(_)
1606 | CatalogItem::Type(_)
1607 | CatalogItem::Func(_)
1608 | CatalogItem::Secret(_)
1609 | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1610 id: id.to_string(),
1611 name,
1612 }),
1613 };
1614 CatalogState::add_to_audit_log(
1615 &state.system_configuration,
1616 oracle_write_ts,
1617 session,
1618 tx,
1619 audit_events,
1620 EventType::Create,
1621 catalog_type_to_audit_object_type(item.typ()),
1622 details,
1623 )?;
1624 }
1625 }
1626 Op::CreateNetworkPolicy {
1627 rules,
1628 name,
1629 owner_id,
1630 } => {
1631 if state.network_policies_by_name.contains_key(&name) {
1632 return Err(AdapterError::PlanError(PlanError::Catalog(
1633 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1634 )));
1635 }
1636 if is_reserved_name(&name) {
1637 return Err(AdapterError::Catalog(Error::new(
1638 ErrorKind::ReservedNetworkPolicyName(name),
1639 )));
1640 }
1641
1642 let owner_privileges = vec![rbac::owner_privilege(
1643 mz_sql::catalog::ObjectType::NetworkPolicy,
1644 owner_id,
1645 )];
1646 let default_privileges = state
1647 .default_privileges
1648 .get_applicable_privileges(
1649 owner_id,
1650 None,
1651 None,
1652 mz_sql::catalog::ObjectType::NetworkPolicy,
1653 )
1654 .map(|item| item.mz_acl_item(owner_id));
1655 let privileges: Vec<_> =
1656 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1657 .collect();
1658
1659 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1660 let id = tx.insert_user_network_policy(
1661 name.clone(),
1662 rules,
1663 privileges,
1664 owner_id,
1665 &temporary_oids,
1666 )?;
1667
1668 CatalogState::add_to_audit_log(
1669 &state.system_configuration,
1670 oracle_write_ts,
1671 session,
1672 tx,
1673 audit_events,
1674 EventType::Create,
1675 ObjectType::NetworkPolicy,
1676 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1677 id: id.to_string(),
1678 name: name.clone(),
1679 }),
1680 )?;
1681
1682 info!("created network policy {name} ({id})");
1683 }
1684 Op::Comment {
1685 object_id,
1686 sub_component,
1687 comment,
1688 } => {
1689 tx.update_comment(object_id, sub_component, comment)?;
1690 let entry = state.get_comment_id_entry(&object_id);
1691 let should_log = entry
1692 .map(|entry| Self::should_audit_log_item(entry.item()))
1693 .unwrap_or(true);
1695 if let (Some(conn_id), true) =
1698 (session.map(|session| session.conn_id()), should_log)
1699 {
1700 CatalogState::add_to_audit_log(
1701 &state.system_configuration,
1702 oracle_write_ts,
1703 session,
1704 tx,
1705 audit_events,
1706 EventType::Comment,
1707 comment_id_to_audit_object_type(object_id),
1708 EventDetails::IdNameV1(IdNameV1 {
1709 id: format!("{object_id:?}"),
1711 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1712 }),
1713 )?;
1714 }
1715 }
1716 Op::UpdateSourceReferences {
1717 source_id,
1718 references,
1719 } => {
1720 tx.update_source_references(
1721 source_id,
1722 references
1723 .references
1724 .into_iter()
1725 .map(|reference| reference.into())
1726 .collect(),
1727 references.updated_at,
1728 )?;
1729 }
1730 Op::DropObjects(drop_object_infos) => {
1731 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1733
1734 tx.drop_comments(&delta.comments)?;
1736
1737 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1739 delta
1740 .items
1741 .iter()
1742 .map(|id| id)
1743 .partition(|id| !state.get_entry(*id).item().is_temporary());
1744 tx.remove_items(&durable_items_to_drop)?;
1745 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1746 let entry = state.get_entry(&id);
1747 (entry.clone().into(), StateDiff::Retraction)
1748 }));
1749
1750 for item_id in delta.items {
1751 let entry = state.get_entry(&item_id);
1752
1753 if entry.item().is_storage_collection() {
1754 storage_collections_to_drop.extend(entry.global_ids());
1755 }
1756
1757 if state.source_references.contains_key(&item_id) {
1758 tx.remove_source_references(item_id)?;
1759 }
1760
1761 if Self::should_audit_log_item(entry.item()) {
1762 CatalogState::add_to_audit_log(
1763 &state.system_configuration,
1764 oracle_write_ts,
1765 session,
1766 tx,
1767 audit_events,
1768 EventType::Drop,
1769 catalog_type_to_audit_object_type(entry.item().typ()),
1770 EventDetails::IdFullNameV1(IdFullNameV1 {
1771 id: item_id.to_string(),
1772 name: Self::full_name_detail(&state.resolve_full_name(
1773 entry.name(),
1774 session.map(|session| session.conn_id()),
1775 )),
1776 }),
1777 )?;
1778 }
1779 info!(
1780 "drop {} {} ({})",
1781 entry.item_type(),
1782 state.resolve_full_name(entry.name(), entry.conn_id()),
1783 item_id
1784 );
1785 }
1786
1787 let schemas = delta
1789 .schemas
1790 .iter()
1791 .map(|(schema_spec, database_spec)| {
1792 (SchemaId::from(schema_spec), *database_spec)
1793 })
1794 .collect();
1795 tx.remove_schemas(&schemas)?;
1796
1797 for (schema_spec, database_spec) in delta.schemas {
1798 let schema = state.get_schema(
1799 &database_spec,
1800 &schema_spec,
1801 session
1802 .map(|session| session.conn_id())
1803 .unwrap_or(&SYSTEM_CONN_ID),
1804 );
1805
1806 let schema_id = SchemaId::from(schema_spec);
1807 let database_id = match database_spec {
1808 ResolvedDatabaseSpecifier::Ambient => None,
1809 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1810 };
1811
1812 CatalogState::add_to_audit_log(
1813 &state.system_configuration,
1814 oracle_write_ts,
1815 session,
1816 tx,
1817 audit_events,
1818 EventType::Drop,
1819 ObjectType::Schema,
1820 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1821 id: schema_id.to_string(),
1822 name: schema.name.schema.to_string(),
1823 database_name: database_id
1824 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1825 }),
1826 )?;
1827 }
1828
1829 tx.remove_databases(&delta.databases)?;
1831
1832 for database_id in delta.databases {
1833 let database = state.get_database(&database_id).clone();
1834
1835 CatalogState::add_to_audit_log(
1836 &state.system_configuration,
1837 oracle_write_ts,
1838 session,
1839 tx,
1840 audit_events,
1841 EventType::Drop,
1842 ObjectType::Database,
1843 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1844 id: database_id.to_string(),
1845 name: database.name.clone(),
1846 }),
1847 )?;
1848 }
1849
1850 tx.remove_user_roles(&delta.roles)?;
1852
1853 for role_id in delta.roles {
1854 let role = state
1855 .roles_by_id
1856 .get(&role_id)
1857 .expect("catalog out of sync");
1858
1859 CatalogState::add_to_audit_log(
1860 &state.system_configuration,
1861 oracle_write_ts,
1862 session,
1863 tx,
1864 audit_events,
1865 EventType::Drop,
1866 ObjectType::Role,
1867 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1868 id: role.id.to_string(),
1869 name: role.name.clone(),
1870 }),
1871 )?;
1872 info!("drop role {}", role.name());
1873 }
1874
1875 tx.remove_network_policies(&delta.network_policies)?;
1877
1878 for network_policy_id in delta.network_policies {
1879 let policy = state
1880 .network_policies_by_id
1881 .get(&network_policy_id)
1882 .expect("catalog out of sync");
1883
1884 CatalogState::add_to_audit_log(
1885 &state.system_configuration,
1886 oracle_write_ts,
1887 session,
1888 tx,
1889 audit_events,
1890 EventType::Drop,
1891 ObjectType::NetworkPolicy,
1892 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1893 id: policy.id.to_string(),
1894 name: policy.name.clone(),
1895 }),
1896 )?;
1897 info!("drop network policy {}", policy.name.clone());
1898 }
1899
1900 let replicas = delta.replicas.keys().copied().collect();
1902 tx.remove_cluster_replicas(&replicas)?;
1903
1904 for (replica_id, (cluster_id, reason)) in delta.replicas {
1905 let cluster = state.get_cluster(cluster_id);
1906 let replica = cluster.replica(replica_id).expect("Must exist");
1907
1908 let (reason, scheduling_policies) = reason.into_audit_log();
1909 let details =
1910 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1911 cluster_id: cluster_id.to_string(),
1912 cluster_name: cluster.name.clone(),
1913 replica_id: Some(replica_id.to_string()),
1914 replica_name: replica.name.clone(),
1915 reason,
1916 scheduling_policies,
1917 });
1918 CatalogState::add_to_audit_log(
1919 &state.system_configuration,
1920 oracle_write_ts,
1921 session,
1922 tx,
1923 audit_events,
1924 EventType::Drop,
1925 ObjectType::ClusterReplica,
1926 details,
1927 )?;
1928 }
1929
1930 tx.remove_clusters(&delta.clusters)?;
1932
1933 for cluster_id in delta.clusters {
1934 let cluster = state.get_cluster(cluster_id);
1935
1936 CatalogState::add_to_audit_log(
1937 &state.system_configuration,
1938 oracle_write_ts,
1939 session,
1940 tx,
1941 audit_events,
1942 EventType::Drop,
1943 ObjectType::Cluster,
1944 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1945 id: cluster.id.to_string(),
1946 name: cluster.name.clone(),
1947 }),
1948 )?;
1949 }
1950 }
1951 Op::GrantRole {
1952 role_id,
1953 member_id,
1954 grantor_id,
1955 } => {
1956 state.ensure_not_reserved_role(&member_id)?;
1957 state.ensure_grantable_role(&role_id)?;
1958 if state.collect_role_membership(&role_id).contains(&member_id) {
1959 let group_role = state.get_role(&role_id);
1960 let member_role = state.get_role(&member_id);
1961 return Err(AdapterError::Catalog(Error::new(
1962 ErrorKind::CircularRoleMembership {
1963 role_name: group_role.name().to_string(),
1964 member_name: member_role.name().to_string(),
1965 },
1966 )));
1967 }
1968 let mut member_role = state.get_role(&member_id).clone();
1969 member_role.membership.map.insert(role_id, grantor_id);
1970 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1971
1972 CatalogState::add_to_audit_log(
1973 &state.system_configuration,
1974 oracle_write_ts,
1975 session,
1976 tx,
1977 audit_events,
1978 EventType::Grant,
1979 ObjectType::Role,
1980 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1981 role_id: role_id.to_string(),
1982 member_id: member_id.to_string(),
1983 grantor_id: grantor_id.to_string(),
1984 executed_by: session
1985 .map(|session| session.authenticated_role_id())
1986 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1987 .to_string(),
1988 }),
1989 )?;
1990 }
1991 Op::RevokeRole {
1992 role_id,
1993 member_id,
1994 grantor_id,
1995 } => {
1996 state.ensure_not_reserved_role(&member_id)?;
1997 state.ensure_grantable_role(&role_id)?;
1998 let mut member_role = state.get_role(&member_id).clone();
1999 member_role.membership.map.remove(&role_id);
2000 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
2001
2002 CatalogState::add_to_audit_log(
2003 &state.system_configuration,
2004 oracle_write_ts,
2005 session,
2006 tx,
2007 audit_events,
2008 EventType::Revoke,
2009 ObjectType::Role,
2010 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
2011 role_id: role_id.to_string(),
2012 member_id: member_id.to_string(),
2013 grantor_id: grantor_id.to_string(),
2014 executed_by: session
2015 .map(|session| session.authenticated_role_id())
2016 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
2017 .to_string(),
2018 }),
2019 )?;
2020 }
2021 Op::UpdatePrivilege {
2022 target_id,
2023 privilege,
2024 variant,
2025 } => {
2026 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2027 UpdatePrivilegeVariant::Grant => {
2028 privileges.grant(privilege);
2029 }
2030 UpdatePrivilegeVariant::Revoke => {
2031 privileges.revoke(&privilege);
2032 }
2033 };
2034 match &target_id {
2035 SystemObjectId::Object(object_id) => match object_id {
2036 ObjectId::Cluster(id) => {
2037 let mut cluster = state.get_cluster(*id).clone();
2038 update_privilege_fn(&mut cluster.privileges);
2039 tx.update_cluster(*id, cluster.into())?;
2040 }
2041 ObjectId::Database(id) => {
2042 let mut database = state.get_database(id).clone();
2043 update_privilege_fn(&mut database.privileges);
2044 tx.update_database(*id, database.into())?;
2045 }
2046 ObjectId::NetworkPolicy(id) => {
2047 let mut policy = state.get_network_policy(id).clone();
2048 update_privilege_fn(&mut policy.privileges);
2049 tx.update_network_policy(*id, policy.into())?;
2050 }
2051 ObjectId::Schema((database_spec, schema_spec)) => {
2052 let schema_id = schema_spec.clone().into();
2053 let mut schema = state
2054 .get_schema(
2055 database_spec,
2056 schema_spec,
2057 session
2058 .map(|session| session.conn_id())
2059 .unwrap_or(&SYSTEM_CONN_ID),
2060 )
2061 .clone();
2062 update_privilege_fn(&mut schema.privileges);
2063 tx.update_schema(schema_id, schema.into())?;
2064 }
2065 ObjectId::Item(id) => {
2066 let entry = state.get_entry(id);
2067 let mut new_entry = entry.clone();
2068 update_privilege_fn(&mut new_entry.privileges);
2069 if !new_entry.item().is_temporary() {
2070 tx.update_item(*id, new_entry.into())?;
2071 } else {
2072 temporary_item_updates
2073 .push((entry.clone().into(), StateDiff::Retraction));
2074 temporary_item_updates
2075 .push((new_entry.into(), StateDiff::Addition));
2076 }
2077 }
2078 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2079 },
2080 SystemObjectId::System => {
2081 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2082 update_privilege_fn(&mut system_privileges);
2083 let new_privilege =
2084 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2085 tx.set_system_privilege(
2086 privilege.grantee,
2087 privilege.grantor,
2088 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2089 )?;
2090 }
2091 }
2092 let object_type = state.get_system_object_type(&target_id);
2093 let object_id_str = match &target_id {
2094 SystemObjectId::System => "SYSTEM".to_string(),
2095 SystemObjectId::Object(id) => id.to_string(),
2096 };
2097 CatalogState::add_to_audit_log(
2098 &state.system_configuration,
2099 oracle_write_ts,
2100 session,
2101 tx,
2102 audit_events,
2103 variant.into(),
2104 system_object_type_to_audit_object_type(&object_type),
2105 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2106 object_id: object_id_str,
2107 grantee_id: privilege.grantee.to_string(),
2108 grantor_id: privilege.grantor.to_string(),
2109 privileges: privilege.acl_mode.to_string(),
2110 }),
2111 )?;
2112 }
2113 Op::UpdateDefaultPrivilege {
2114 privilege_object,
2115 privilege_acl_item,
2116 variant,
2117 } => {
2118 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2119 match variant {
2120 UpdatePrivilegeVariant::Grant => default_privileges
2121 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2122 UpdatePrivilegeVariant::Revoke => {
2123 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2124 }
2125 }
2126 let new_acl_mode = default_privileges
2127 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2128 tx.set_default_privilege(
2129 privilege_object.role_id,
2130 privilege_object.database_id,
2131 privilege_object.schema_id,
2132 privilege_object.object_type,
2133 privilege_acl_item.grantee,
2134 new_acl_mode.cloned(),
2135 )?;
2136 CatalogState::add_to_audit_log(
2137 &state.system_configuration,
2138 oracle_write_ts,
2139 session,
2140 tx,
2141 audit_events,
2142 variant.into(),
2143 object_type_to_audit_object_type(privilege_object.object_type),
2144 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2145 role_id: privilege_object.role_id.to_string(),
2146 database_id: privilege_object.database_id.map(|id| id.to_string()),
2147 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2148 grantee_id: privilege_acl_item.grantee.to_string(),
2149 privileges: privilege_acl_item.acl_mode.to_string(),
2150 }),
2151 )?;
2152 }
2153 Op::RenameCluster {
2154 id,
2155 name,
2156 to_name,
2157 check_reserved_names,
2158 } => {
2159 if id.is_system() {
2160 return Err(AdapterError::Catalog(Error::new(
2161 ErrorKind::ReadOnlyCluster(name.clone()),
2162 )));
2163 }
2164 if check_reserved_names && is_reserved_name(&to_name) {
2165 return Err(AdapterError::Catalog(Error::new(
2166 ErrorKind::ReservedClusterName(to_name),
2167 )));
2168 }
2169 tx.rename_cluster(id, &name, &to_name)?;
2170 CatalogState::add_to_audit_log(
2171 &state.system_configuration,
2172 oracle_write_ts,
2173 session,
2174 tx,
2175 audit_events,
2176 EventType::Alter,
2177 ObjectType::Cluster,
2178 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2179 id: id.to_string(),
2180 old_name: name.clone(),
2181 new_name: to_name.clone(),
2182 }),
2183 )?;
2184 info!("rename cluster {name} to {to_name}");
2185 }
2186 Op::RenameClusterReplica {
2187 cluster_id,
2188 replica_id,
2189 name,
2190 to_name,
2191 } => {
2192 if is_reserved_name(&to_name) {
2193 return Err(AdapterError::Catalog(Error::new(
2194 ErrorKind::ReservedReplicaName(to_name),
2195 )));
2196 }
2197 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2198 CatalogState::add_to_audit_log(
2199 &state.system_configuration,
2200 oracle_write_ts,
2201 session,
2202 tx,
2203 audit_events,
2204 EventType::Alter,
2205 ObjectType::ClusterReplica,
2206 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2207 cluster_id: cluster_id.to_string(),
2208 replica_id: replica_id.to_string(),
2209 old_name: name.replica.as_str().to_string(),
2210 new_name: to_name.clone(),
2211 }),
2212 )?;
2213 info!("rename cluster replica {name} to {to_name}");
2214 }
2215 Op::RenameItem {
2216 id,
2217 to_name,
2218 current_full_name,
2219 } => {
2220 let mut updates = Vec::new();
2221
2222 let entry = state.get_entry(&id);
2223 if let CatalogItem::Type(_) = entry.item() {
2224 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2225 current_full_name.to_string(),
2226 ))));
2227 }
2228
2229 if entry.id().is_system() {
2230 let name = state
2231 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2232 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2233 name.to_string(),
2234 ))));
2235 }
2236
2237 let mut to_full_name = current_full_name.clone();
2238 to_full_name.item.clone_from(&to_name);
2239
2240 let mut to_qualified_name = entry.name().clone();
2241 to_qualified_name.item.clone_from(&to_name);
2242
2243 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2244 id: id.to_string(),
2245 old_name: Self::full_name_detail(¤t_full_name),
2246 new_name: Self::full_name_detail(&to_full_name),
2247 });
2248 if Self::should_audit_log_item(entry.item()) {
2249 CatalogState::add_to_audit_log(
2250 &state.system_configuration,
2251 oracle_write_ts,
2252 session,
2253 tx,
2254 audit_events,
2255 EventType::Alter,
2256 catalog_type_to_audit_object_type(entry.item().typ()),
2257 details,
2258 )?;
2259 }
2260
2261 let mut new_entry = entry.clone();
2263 new_entry.name.item.clone_from(&to_name);
2264 new_entry.item = entry
2265 .item()
2266 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2267 .map_err(|e| {
2268 Error::new(ErrorKind::from(AmbiguousRename {
2269 depender: state
2270 .resolve_full_name(entry.name(), entry.conn_id())
2271 .to_string(),
2272 dependee: state
2273 .resolve_full_name(entry.name(), entry.conn_id())
2274 .to_string(),
2275 message: e,
2276 }))
2277 })?;
2278
2279 for id in entry.referenced_by() {
2280 let dependent_item = state.get_entry(id);
2281 let mut to_entry = dependent_item.clone();
2282 to_entry.item = dependent_item
2283 .item()
2284 .rename_item_refs(
2285 current_full_name.clone(),
2286 to_full_name.item.clone(),
2287 false,
2288 )
2289 .map_err(|e| {
2290 Error::new(ErrorKind::from(AmbiguousRename {
2291 depender: state
2292 .resolve_full_name(
2293 dependent_item.name(),
2294 dependent_item.conn_id(),
2295 )
2296 .to_string(),
2297 dependee: state
2298 .resolve_full_name(entry.name(), entry.conn_id())
2299 .to_string(),
2300 message: e,
2301 }))
2302 })?;
2303
2304 if !to_entry.item().is_temporary() {
2305 tx.update_item(*id, to_entry.into())?;
2306 } else {
2307 temporary_item_updates
2308 .push((dependent_item.clone().into(), StateDiff::Retraction));
2309 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2310 }
2311 updates.push(*id);
2312 }
2313 if !new_entry.item().is_temporary() {
2314 tx.update_item(id, new_entry.into())?;
2315 } else {
2316 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2317 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2318 }
2319
2320 updates.push(id);
2321 for id in updates {
2322 Self::log_update(state, &id);
2323 }
2324 }
2325 Op::RenameSchema {
2326 database_spec,
2327 schema_spec,
2328 new_name,
2329 check_reserved_names,
2330 } => {
2331 if check_reserved_names && is_reserved_name(&new_name) {
2332 return Err(AdapterError::Catalog(Error::new(
2333 ErrorKind::ReservedSchemaName(new_name),
2334 )));
2335 }
2336
2337 let conn_id = session
2338 .map(|session| session.conn_id())
2339 .unwrap_or(&SYSTEM_CONN_ID);
2340
2341 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2342 let cur_name = schema.name().schema.clone();
2343
2344 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2345 return Err(AdapterError::Catalog(Error::new(
2346 ErrorKind::AmbientSchemaRename(cur_name),
2347 )));
2348 };
2349 let database = state.get_database(&database_id);
2350 let database_name = &database.name;
2351
2352 let mut updates = Vec::new();
2353 let mut items_to_update = BTreeMap::new();
2354
2355 let mut update_item = |id| {
2356 if items_to_update.contains_key(id) {
2357 return Ok(());
2358 }
2359
2360 let entry = state.get_entry(id);
2361
2362 let mut new_entry = entry.clone();
2364 new_entry.item = entry
2365 .item
2366 .rename_schema_refs(database_name, &cur_name, &new_name)
2367 .map_err(|(s, _i)| {
2368 Error::new(ErrorKind::from(AmbiguousRename {
2369 depender: state
2370 .resolve_full_name(entry.name(), entry.conn_id())
2371 .to_string(),
2372 dependee: format!("{database_name}.{cur_name}"),
2373 message: format!("ambiguous reference to schema named {s}"),
2374 }))
2375 })?;
2376
2377 if !new_entry.item().is_temporary() {
2379 items_to_update.insert(*id, new_entry.into());
2380 } else {
2381 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2382 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2383 }
2384 updates.push(id);
2385
2386 Ok::<_, AdapterError>(())
2387 };
2388
2389 for (_name, item_id) in &schema.items {
2391 update_item(item_id)?;
2393
2394 for id in state.get_entry(item_id).referenced_by() {
2396 update_item(id)?;
2397 }
2398 }
2399 tx.update_items(items_to_update)?;
2402
2403 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2405 let schema_name = schema.name().schema.clone();
2406 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2407 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2408 )));
2409 };
2410
2411 let database_name = database_spec
2413 .id()
2414 .map(|id| state.get_database(&id).name.clone());
2415 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2416 id: schema_id.to_string(),
2417 old_name: schema.name().schema.clone(),
2418 new_name: new_name.clone(),
2419 database_name,
2420 });
2421 CatalogState::add_to_audit_log(
2422 &state.system_configuration,
2423 oracle_write_ts,
2424 session,
2425 tx,
2426 audit_events,
2427 EventType::Alter,
2428 mz_audit_log::ObjectType::Schema,
2429 details,
2430 )?;
2431
2432 let mut new_schema = schema.clone();
2434 new_schema.name.schema.clone_from(&new_name);
2435 tx.update_schema(schema_id, new_schema.into())?;
2436
2437 for id in updates {
2438 Self::log_update(state, id);
2439 }
2440 }
2441 Op::UpdateOwner { id, new_owner } => {
2442 let conn_id = session
2443 .map(|session| session.conn_id())
2444 .unwrap_or(&SYSTEM_CONN_ID);
2445 let old_owner = state
2446 .get_owner_id(&id, conn_id)
2447 .expect("cannot update the owner of an object without an owner");
2448 match &id {
2449 ObjectId::Cluster(id) => {
2450 let mut cluster = state.get_cluster(*id).clone();
2451 if id.is_system() {
2452 return Err(AdapterError::Catalog(Error::new(
2453 ErrorKind::ReadOnlyCluster(cluster.name),
2454 )));
2455 }
2456 Self::update_privilege_owners(
2457 &mut cluster.privileges,
2458 cluster.owner_id,
2459 new_owner,
2460 );
2461 cluster.owner_id = new_owner;
2462 tx.update_cluster(*id, cluster.into())?;
2463 }
2464 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2465 let cluster = state.get_cluster(*cluster_id);
2466 let mut replica = cluster
2467 .replica(*replica_id)
2468 .expect("catalog out of sync")
2469 .clone();
2470 if replica_id.is_system() {
2471 return Err(AdapterError::Catalog(Error::new(
2472 ErrorKind::ReadOnlyClusterReplica(replica.name),
2473 )));
2474 }
2475 replica.owner_id = new_owner;
2476 tx.update_cluster_replica(*replica_id, replica.into())?;
2477 }
2478 ObjectId::Database(id) => {
2479 let mut database = state.get_database(id).clone();
2480 if id.is_system() {
2481 return Err(AdapterError::Catalog(Error::new(
2482 ErrorKind::ReadOnlyDatabase(database.name),
2483 )));
2484 }
2485 Self::update_privilege_owners(
2486 &mut database.privileges,
2487 database.owner_id,
2488 new_owner,
2489 );
2490 database.owner_id = new_owner;
2491 tx.update_database(*id, database.clone().into())?;
2492 }
2493 ObjectId::Schema((database_spec, schema_spec)) => {
2494 let schema_id: SchemaId = schema_spec.clone().into();
2495 let mut schema = state
2496 .get_schema(database_spec, schema_spec, conn_id)
2497 .clone();
2498 if schema_id.is_system() {
2499 let name = schema.name();
2500 let full_name = state.resolve_full_schema_name(name);
2501 return Err(AdapterError::Catalog(Error::new(
2502 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2503 )));
2504 }
2505 Self::update_privilege_owners(
2506 &mut schema.privileges,
2507 schema.owner_id,
2508 new_owner,
2509 );
2510 schema.owner_id = new_owner;
2511 tx.update_schema(schema_id, schema.into())?;
2512 }
2513 ObjectId::Item(id) => {
2514 let entry = state.get_entry(id);
2515 let mut new_entry = entry.clone();
2516 if id.is_system() {
2517 let full_name = state.resolve_full_name(
2518 new_entry.name(),
2519 session.map(|session| session.conn_id()),
2520 );
2521 return Err(AdapterError::Catalog(Error::new(
2522 ErrorKind::ReadOnlyItem(full_name.to_string()),
2523 )));
2524 }
2525 Self::update_privilege_owners(
2526 &mut new_entry.privileges,
2527 new_entry.owner_id,
2528 new_owner,
2529 );
2530 new_entry.owner_id = new_owner;
2531 if !new_entry.item().is_temporary() {
2532 tx.update_item(*id, new_entry.into())?;
2533 } else {
2534 temporary_item_updates
2535 .push((entry.clone().into(), StateDiff::Retraction));
2536 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2537 }
2538 }
2539 ObjectId::NetworkPolicy(id) => {
2540 let mut policy = state.get_network_policy(id).clone();
2541 if id.is_system() {
2542 return Err(AdapterError::Catalog(Error::new(
2543 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2544 )));
2545 }
2546 Self::update_privilege_owners(
2547 &mut policy.privileges,
2548 policy.owner_id,
2549 new_owner,
2550 );
2551 policy.owner_id = new_owner;
2552 tx.update_network_policy(*id, policy.into())?;
2553 }
2554 ObjectId::Role(_) => unreachable!("roles have no owner"),
2555 }
2556 let object_type = state.get_object_type(&id);
2557 CatalogState::add_to_audit_log(
2558 &state.system_configuration,
2559 oracle_write_ts,
2560 session,
2561 tx,
2562 audit_events,
2563 EventType::Alter,
2564 object_type_to_audit_object_type(object_type),
2565 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2566 object_id: id.to_string(),
2567 old_owner_id: old_owner.to_string(),
2568 new_owner_id: new_owner.to_string(),
2569 }),
2570 )?;
2571 }
2572 Op::UpdateClusterConfig { id, name, config } => {
2573 let mut cluster = state.get_cluster(id).clone();
2574 cluster.config = config;
2575 tx.update_cluster(id, cluster.into())?;
2576 info!("update cluster {}", name);
2577
2578 CatalogState::add_to_audit_log(
2579 &state.system_configuration,
2580 oracle_write_ts,
2581 session,
2582 tx,
2583 audit_events,
2584 EventType::Alter,
2585 ObjectType::Cluster,
2586 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2587 id: id.to_string(),
2588 name,
2589 }),
2590 )?;
2591 }
2592 Op::UpdateClusterReplicaConfig {
2593 replica_id,
2594 cluster_id,
2595 config,
2596 } => {
2597 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2598 info!("update replica {}", replica.name);
2599 tx.update_cluster_replica(
2600 replica_id,
2601 mz_catalog::durable::ClusterReplica {
2602 cluster_id,
2603 replica_id,
2604 name: replica.name.clone(),
2605 config: config.clone().into(),
2606 owner_id: replica.owner_id,
2607 },
2608 )?;
2609 }
2610 Op::UpdateItem { id, name, to_item } => {
2611 let mut entry = state.get_entry(&id).clone();
2612 entry.name = name.clone();
2613 entry.item = to_item.clone();
2614 tx.update_item(id, entry.into())?;
2615
2616 if Self::should_audit_log_item(&to_item) {
2617 let mut full_name = Self::full_name_detail(
2618 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2619 );
2620 full_name.item = name.item;
2621
2622 CatalogState::add_to_audit_log(
2623 &state.system_configuration,
2624 oracle_write_ts,
2625 session,
2626 tx,
2627 audit_events,
2628 EventType::Alter,
2629 catalog_type_to_audit_object_type(to_item.typ()),
2630 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2631 id: id.to_string(),
2632 name: full_name,
2633 }),
2634 )?;
2635 }
2636
2637 Self::log_update(state, &id);
2638 }
2639 Op::UpdateSystemConfiguration { name, value } => {
2640 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2641 tx.upsert_system_config(&name, parsed_value.clone())?;
2642 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2647 let with_0dt_deployment_max_wait =
2648 Duration::parse(VarInput::Flat(&parsed_value))
2649 .expect("parsing succeeded above");
2650 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2651 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2652 let with_0dt_deployment_ddl_check_interval =
2653 Duration::parse(VarInput::Flat(&parsed_value))
2654 .expect("parsing succeeded above");
2655 tx.set_0dt_deployment_ddl_check_interval(
2656 with_0dt_deployment_ddl_check_interval,
2657 )?;
2658 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2659 let panic_after_timeout =
2660 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2661 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2662 }
2663
2664 CatalogState::add_to_audit_log(
2665 &state.system_configuration,
2666 oracle_write_ts,
2667 session,
2668 tx,
2669 audit_events,
2670 EventType::Alter,
2671 ObjectType::System,
2672 EventDetails::SetV1(mz_audit_log::SetV1 {
2673 name,
2674 value: Some(value.borrow().to_vec().join(", ")),
2675 }),
2676 )?;
2677 }
2678 Op::ResetSystemConfiguration { name } => {
2679 tx.remove_system_config(&name);
2680 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2685 tx.reset_0dt_deployment_max_wait()?;
2686 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2687 tx.reset_0dt_deployment_ddl_check_interval()?;
2688 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2689 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2690 }
2691
2692 CatalogState::add_to_audit_log(
2693 &state.system_configuration,
2694 oracle_write_ts,
2695 session,
2696 tx,
2697 audit_events,
2698 EventType::Alter,
2699 ObjectType::System,
2700 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2701 )?;
2702 }
2703 Op::ResetAllSystemConfiguration => {
2704 tx.clear_system_configs();
2705 tx.reset_0dt_deployment_max_wait()?;
2706 tx.reset_0dt_deployment_ddl_check_interval()?;
2707 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2708
2709 CatalogState::add_to_audit_log(
2710 &state.system_configuration,
2711 oracle_write_ts,
2712 session,
2713 tx,
2714 audit_events,
2715 EventType::Alter,
2716 ObjectType::System,
2717 EventDetails::ResetAllV1,
2718 )?;
2719 }
2720 Op::UpdateScopedSystemParameters {
2721 scoped,
2722 prune_scope,
2723 } => {
2724 let live_clusters: BTreeSet<ClusterId> =
2737 tx.get_clusters().map(|cluster| cluster.id).collect();
2738 let live_replicas: BTreeSet<ReplicaId> = tx
2739 .get_cluster_replicas()
2740 .map(|replica| replica.replica_id)
2741 .collect();
2742 let prune_cluster = |id: &ClusterId| {
2743 prune_scope
2744 .as_ref()
2745 .map_or(true, |s| s.clusters.contains(id))
2746 };
2747 let prune_replica = |id: &ReplicaId| {
2748 prune_scope
2749 .as_ref()
2750 .map_or(true, |s| s.replicas.contains(id))
2751 };
2752
2753 let existing_cluster: BTreeMap<(ClusterId, String), String> = tx
2755 .get_cluster_system_configurations()
2756 .map(|c| ((c.cluster_id, c.name), c.value))
2757 .collect();
2758 let mut desired_cluster: BTreeSet<(ClusterId, String)> = BTreeSet::new();
2759 for (cluster_id, values) in &scoped.cluster {
2760 if !live_clusters.contains(cluster_id) {
2761 continue;
2762 }
2763 for (name, value) in values {
2764 desired_cluster.insert((*cluster_id, name.clone()));
2765 if existing_cluster.get(&(*cluster_id, name.clone())) != Some(value) {
2766 tx.upsert_cluster_system_config(*cluster_id, name, value.clone())?;
2767 }
2768 }
2769 }
2770 for (cluster_id, name) in existing_cluster.into_keys() {
2771 if (!live_clusters.contains(&cluster_id) || prune_cluster(&cluster_id))
2772 && !desired_cluster.contains(&(cluster_id, name.clone()))
2773 {
2774 tx.remove_cluster_system_config(cluster_id, &name);
2775 }
2776 }
2777
2778 let existing_replica: BTreeMap<(ReplicaId, String), String> = tx
2780 .get_replica_system_configurations()
2781 .map(|r| ((r.replica_id, r.name), r.value))
2782 .collect();
2783 let mut desired_replica: BTreeSet<(ReplicaId, String)> = BTreeSet::new();
2784 for (replica_id, values) in &scoped.replica {
2785 if !live_replicas.contains(replica_id) {
2786 continue;
2787 }
2788 for (name, value) in values {
2789 desired_replica.insert((*replica_id, name.clone()));
2790 if existing_replica.get(&(*replica_id, name.clone())) != Some(value) {
2791 tx.upsert_replica_system_config(*replica_id, name, value.clone())?;
2792 }
2793 }
2794 }
2795 for (replica_id, name) in existing_replica.into_keys() {
2796 if (!live_replicas.contains(&replica_id) || prune_replica(&replica_id))
2797 && !desired_replica.contains(&(replica_id, name.clone()))
2798 {
2799 tx.remove_replica_system_config(replica_id, &name);
2800 }
2801 }
2802 }
2803 Op::InjectAuditEvents { events } => {
2804 for event in events {
2805 let id = tx.allocate_audit_log_id()?;
2806 let ev = VersionedEvent::new(
2807 id,
2808 event.event_type,
2809 event.object_type,
2810 event.details,
2811 event.user,
2812 oracle_write_ts.into(),
2813 );
2814 audit_events.push(ev.clone());
2815 tx.insert_audit_log_event(ev);
2816 }
2817 }
2818 };
2819 Ok(temporary_item_updates)
2820 }
2821
2822 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2823 let entry = state.get_entry(id);
2824 info!(
2825 "update {} {} ({})",
2826 entry.item_type(),
2827 state.resolve_full_name(entry.name(), entry.conn_id()),
2828 id
2829 );
2830 }
2831
2832 fn update_privilege_owners(
2836 privileges: &mut PrivilegeMap,
2837 old_owner: RoleId,
2838 new_owner: RoleId,
2839 ) {
2840 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2842
2843 let mut new_present = false;
2844 for privilege in flat_privileges.iter_mut() {
2845 if privilege.grantor == old_owner {
2848 privilege.grantor = new_owner;
2849 } else if privilege.grantor == new_owner {
2850 new_present = true;
2851 }
2852 if privilege.grantee == old_owner {
2854 privilege.grantee = new_owner;
2855 } else if privilege.grantee == new_owner {
2856 new_present = true;
2857 }
2858 }
2859
2860 if new_present {
2864 let privilege_map: BTreeMap<_, Vec<_>> =
2866 flat_privileges
2867 .into_iter()
2868 .fold(BTreeMap::new(), |mut accum, privilege| {
2869 accum
2870 .entry((privilege.grantee, privilege.grantor))
2871 .or_default()
2872 .push(privilege);
2873 accum
2874 });
2875
2876 flat_privileges = privilege_map
2878 .into_iter()
2879 .map(|((grantee, grantor), values)|
2880 values.into_iter().fold(
2882 MzAclItem::empty(grantee, grantor),
2883 |mut accum, mz_aclitem| {
2884 accum.acl_mode =
2885 accum.acl_mode.union(mz_aclitem.acl_mode);
2886 accum
2887 },
2888 ))
2889 .collect();
2890 }
2891
2892 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2893 }
2894}
2895
2896fn tx_replace_item(
2905 tx: &mut Transaction<'_>,
2906 state: &CatalogState,
2907 id: CatalogItemId,
2908 new_entry: CatalogEntry,
2909) -> Result<(), AdapterError> {
2910 let new_id = new_entry.id;
2911
2912 for use_id in new_entry.referenced_by() {
2914 if tx.get_item(use_id).is_none() {
2916 continue;
2917 }
2918
2919 let mut dependent = state.get_entry(use_id).clone();
2920 dependent.item = dependent.item.replace_item_refs(id, new_id);
2921 tx.update_item(*use_id, dependent.into())?;
2922 }
2923
2924 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2926 let new_comment_id = new_entry.comment_object_id();
2927 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2928 tx.drop_comments(&[old_comment_id].into())?;
2929 for (sub, comment) in comments {
2930 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2931 }
2932 }
2933
2934 let mz_catalog::durable::Item {
2935 id: _,
2936 oid,
2937 global_id,
2938 schema_id,
2939 name,
2940 create_sql,
2941 owner_id,
2942 privileges,
2943 extra_versions,
2944 } = new_entry.into();
2945
2946 tx.remove_item(id)?;
2947 tx.insert_item(
2948 new_id,
2949 oid,
2950 global_id,
2951 schema_id,
2952 &name,
2953 create_sql,
2954 owner_id,
2955 privileges,
2956 extra_versions,
2957 )?;
2958
2959 Ok(())
2960}
2961
2962fn apply_replacement_audit_events(
2964 state: &CatalogState,
2965 target: &CatalogEntry,
2966 replacement: &CatalogEntry,
2967) -> Vec<(EventType, EventDetails)> {
2968 let mut events = Vec::new();
2969
2970 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2971 let target_id_name = IdFullNameV1 {
2972 id: target.id().to_string(),
2973 name: Catalog::full_name_detail(&target_name),
2974 };
2975 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2976 let replacement_id_name = IdFullNameV1 {
2977 id: replacement.id().to_string(),
2978 name: Catalog::full_name_detail(&replacement_name),
2979 };
2980
2981 if Catalog::should_audit_log_item(&replacement.item) {
2982 events.push((
2983 EventType::Drop,
2984 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2985 ));
2986 }
2987
2988 if Catalog::should_audit_log_item(&target.item) {
2989 events.push((
2990 EventType::Alter,
2991 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2992 target: target_id_name.clone(),
2993 replacement: replacement_id_name,
2994 }),
2995 ));
2996
2997 if let Some(old_cluster_id) = target.cluster_id()
2998 && let Some(new_cluster_id) = replacement.cluster_id()
2999 && old_cluster_id != new_cluster_id
3000 {
3001 events.push((
3004 EventType::Alter,
3005 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
3006 id: replacement.id().to_string(),
3007 name: target_id_name.name,
3008 old_cluster_id: old_cluster_id.to_string(),
3009 new_cluster_id: new_cluster_id.to_string(),
3010 }),
3011 ));
3012 }
3013 }
3014
3015 events
3016}
3017
3018#[derive(Debug, Default)]
3026pub(crate) struct ObjectsToDrop {
3027 pub comments: BTreeSet<CommentObjectId>,
3028 pub databases: BTreeSet<DatabaseId>,
3029 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
3030 pub clusters: BTreeSet<ClusterId>,
3031 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
3032 pub roles: BTreeSet<RoleId>,
3033 pub items: Vec<CatalogItemId>,
3034 pub network_policies: BTreeSet<NetworkPolicyId>,
3035}
3036
3037impl ObjectsToDrop {
3038 pub fn generate(
3039 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
3040 state: &CatalogState,
3041 session: Option<&ConnMeta>,
3042 ) -> Result<Self, AdapterError> {
3043 let mut delta = ObjectsToDrop::default();
3044
3045 for drop_object_info in drop_object_infos {
3046 delta.add_item(drop_object_info, state, session)?;
3047 }
3048
3049 Ok(delta)
3050 }
3051
3052 fn add_item(
3053 &mut self,
3054 drop_object_info: DropObjectInfo,
3055 state: &CatalogState,
3056 session: Option<&ConnMeta>,
3057 ) -> Result<(), AdapterError> {
3058 self.comments
3059 .insert(state.get_comment_id(drop_object_info.to_object_id()));
3060
3061 match drop_object_info {
3062 DropObjectInfo::Database(database_id) => {
3063 let database = &state.database_by_id[&database_id];
3064 if database_id.is_system() {
3065 return Err(AdapterError::Catalog(Error::new(
3066 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
3067 )));
3068 }
3069
3070 self.databases.insert(database_id);
3071 }
3072 DropObjectInfo::Schema((database_spec, schema_spec)) => {
3073 let schema = state.get_schema(
3074 &database_spec,
3075 &schema_spec,
3076 session
3077 .map(|session| session.conn_id())
3078 .unwrap_or(&SYSTEM_CONN_ID),
3079 );
3080 let schema_id: SchemaId = schema_spec.into();
3081 if schema_id.is_system() {
3082 let name = schema.name();
3083 let full_name = state.resolve_full_schema_name(name);
3084 return Err(AdapterError::Catalog(Error::new(
3085 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
3086 )));
3087 }
3088
3089 self.schemas.insert(schema_spec, database_spec);
3090 }
3091 DropObjectInfo::Role(role_id) => {
3092 let name = state.get_role(&role_id).name().to_string();
3093 if role_id.is_system() || role_id.is_predefined() {
3094 return Err(AdapterError::Catalog(Error::new(
3095 ErrorKind::ReservedRoleName(name.clone()),
3096 )));
3097 }
3098 state.ensure_not_reserved_role(&role_id)?;
3099
3100 self.roles.insert(role_id);
3101 }
3102 DropObjectInfo::Cluster(cluster_id) => {
3103 let cluster = state.get_cluster(cluster_id);
3104 let name = &cluster.name;
3105 if cluster_id.is_system() {
3106 return Err(AdapterError::Catalog(Error::new(
3107 ErrorKind::ReadOnlyCluster(name.clone()),
3108 )));
3109 }
3110
3111 self.clusters.insert(cluster_id);
3112 }
3113 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3114 let cluster = state.get_cluster(cluster_id);
3115 let replica = cluster.replica(replica_id).expect("Must exist");
3116
3117 self.replicas
3118 .insert(replica.replica_id, (cluster.id, reason));
3119
3120 let mut seen: BTreeSet<ObjectId> =
3139 self.items.iter().copied().map(ObjectId::Item).collect();
3140 for dep in state.cluster_replica_dependents(cluster_id, replica_id, &mut seen) {
3141 if let ObjectId::Item(dep_id) = dep {
3142 info!(
3143 "implicitly dropping {} because target replica was dropped",
3144 state.get_entry(&dep_id).name().item,
3145 );
3146 self.comments
3147 .insert(state.get_comment_id(ObjectId::Item(dep_id)));
3148 self.items.push(dep_id);
3149 }
3150 }
3151 }
3152 DropObjectInfo::Item(item_id) => {
3153 let entry = state.get_entry(&item_id);
3154 if item_id.is_system() {
3155 let name = entry.name();
3156 let full_name =
3157 state.resolve_full_name(name, session.map(|session| session.conn_id()));
3158 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3159 full_name.to_string(),
3160 ))));
3161 }
3162
3163 self.items.push(item_id);
3164 }
3165 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3166 let policy = state.get_network_policy(&network_policy_id);
3167 let name = &policy.name;
3168 if network_policy_id.is_system() {
3169 return Err(AdapterError::Catalog(Error::new(
3170 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3171 )));
3172 }
3173
3174 self.network_policies.insert(network_policy_id);
3175 }
3176 }
3177
3178 Ok(())
3179 }
3180}
3181
3182#[cfg(test)]
3183mod tests {
3184 use mz_catalog::SYSTEM_CONN_ID;
3185 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3186 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3187 use mz_repr::role_id::RoleId;
3188 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3189 use mz_sql::DEFAULT_SCHEMA;
3190 use mz_sql::catalog::CatalogDatabase;
3191 use mz_sql::names::{
3192 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3193 };
3194 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3195
3196 use crate::catalog::{Catalog, Op};
3197 use crate::session::DEFAULT_DATABASE_NAME;
3198
3199 #[mz_ore::test]
3200 fn test_update_privilege_owners() {
3201 let old_owner = RoleId::User(1);
3202 let new_owner = RoleId::User(2);
3203 let other_role = RoleId::User(3);
3204
3205 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3207 MzAclItem {
3208 grantee: other_role,
3209 grantor: old_owner,
3210 acl_mode: AclMode::UPDATE,
3211 },
3212 MzAclItem {
3213 grantee: other_role,
3214 grantor: new_owner,
3215 acl_mode: AclMode::SELECT,
3216 },
3217 ]);
3218 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3219 assert_eq!(1, privileges.all_values().count());
3220 assert_eq!(
3221 vec![MzAclItem {
3222 grantee: other_role,
3223 grantor: new_owner,
3224 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3225 }],
3226 privileges.all_values_owned().collect::<Vec<_>>()
3227 );
3228
3229 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3231 MzAclItem {
3232 grantee: old_owner,
3233 grantor: other_role,
3234 acl_mode: AclMode::UPDATE,
3235 },
3236 MzAclItem {
3237 grantee: new_owner,
3238 grantor: other_role,
3239 acl_mode: AclMode::SELECT,
3240 },
3241 ]);
3242 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3243 assert_eq!(1, privileges.all_values().count());
3244 assert_eq!(
3245 vec![MzAclItem {
3246 grantee: new_owner,
3247 grantor: other_role,
3248 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3249 }],
3250 privileges.all_values_owned().collect::<Vec<_>>()
3251 );
3252
3253 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3255 MzAclItem {
3256 grantee: old_owner,
3257 grantor: old_owner,
3258 acl_mode: AclMode::UPDATE,
3259 },
3260 MzAclItem {
3261 grantee: new_owner,
3262 grantor: new_owner,
3263 acl_mode: AclMode::SELECT,
3264 },
3265 ]);
3266 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3267 assert_eq!(1, privileges.all_values().count());
3268 assert_eq!(
3269 vec![MzAclItem {
3270 grantee: new_owner,
3271 grantor: new_owner,
3272 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3273 }],
3274 privileges.all_values_owned().collect::<Vec<_>>()
3275 );
3276 }
3277
3278 #[mz_ore::test(tokio::test)]
3285 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3287 Catalog::with_debug(|catalog| async move {
3288 let database = catalog
3290 .resolve_database(DEFAULT_DATABASE_NAME)
3291 .expect("default database");
3292 let database_name = database.name.clone();
3293 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3294 let schema = catalog
3295 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3296 .expect("default schema");
3297 let schema_name = schema.name.schema.clone();
3298 let schema_spec = schema.id.clone();
3299
3300 let (id_t1, global_id_t1) = catalog
3302 .allocate_user_id_for_test()
3303 .await
3304 .expect("allocate id for t1");
3305 let (id_t2, global_id_t2) = catalog
3306 .allocate_user_id_for_test()
3307 .await
3308 .expect("allocate id for t2");
3309
3310 let oracle_write_ts = catalog.current_upper().await;
3311
3312 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3314 id,
3315 name: QualifiedItemName {
3316 qualifiers: ItemQualifiers {
3317 database_spec: database_spec.clone(),
3318 schema_spec: schema_spec.clone(),
3319 },
3320 item: name.to_string(),
3321 },
3322 item: CatalogItem::Table(Table {
3323 create_sql: Some(format!(
3324 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3325 )),
3326 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3327 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3328 conn_id: None,
3329 resolved_ids: ResolvedIds::empty(),
3330 custom_logical_compaction_window: None,
3331 is_retained_metrics_object: false,
3332 data_source: TableDataSource::TableWrites { defaults: vec![] },
3333 }),
3334 owner_id: MZ_SYSTEM_ROLE_ID,
3335 };
3336
3337 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3338 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3339
3340 let base_state = catalog.state().clone();
3341
3342 let (state_after_t1, snapshot_after_t1) = catalog
3346 .transact_incremental_dry_run(
3347 &base_state,
3348 vec![op_t1.clone()],
3349 None,
3350 None,
3351 oracle_write_ts,
3352 )
3353 .await
3354 .expect("first dry run");
3355
3356 assert!(
3358 state_after_t1.try_get_entry(&id_t1).is_some(),
3359 "t1 should exist after first dry run"
3360 );
3361 assert_eq!(
3362 state_after_t1
3363 .try_get_entry(&id_t1)
3364 .expect("t1 entry")
3365 .name()
3366 .item,
3367 "t1"
3368 );
3369 assert!(
3370 state_after_t1.try_get_entry(&id_t2).is_none(),
3371 "t2 should NOT exist after first dry run"
3372 );
3373
3374 let (state_incremental, _) = catalog
3376 .transact_incremental_dry_run(
3377 &state_after_t1,
3378 vec![op_t2.clone()],
3379 None,
3380 Some(snapshot_after_t1),
3381 oracle_write_ts,
3382 )
3383 .await
3384 .expect("second dry run");
3385
3386 assert!(
3388 state_incremental.try_get_entry(&id_t1).is_some(),
3389 "t1 should exist in incremental result"
3390 );
3391 assert!(
3392 state_incremental.try_get_entry(&id_t2).is_some(),
3393 "t2 should exist in incremental result"
3394 );
3395
3396 let (state_all_at_once, _) = catalog
3399 .transact_incremental_dry_run(
3400 &base_state,
3401 vec![op_t1.clone(), op_t2.clone()],
3402 None,
3403 None,
3404 oracle_write_ts,
3405 )
3406 .await
3407 .expect("all-at-once dry run");
3408
3409 assert!(
3410 state_all_at_once.try_get_entry(&id_t1).is_some(),
3411 "t1 should exist in all-at-once result"
3412 );
3413 assert!(
3414 state_all_at_once.try_get_entry(&id_t2).is_some(),
3415 "t2 should exist in all-at-once result"
3416 );
3417
3418 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3421 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3422 assert_eq!(inc_t1.name(), all_t1.name());
3423 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3424
3425 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3426 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3427 assert_eq!(inc_t2.name(), all_t2.name());
3428 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3429
3430 catalog.expire().await;
3431 })
3432 .await
3433 }
3434
3435 #[mz_ore::test(tokio::test)]
3442 #[cfg_attr(miri, ignore)] async fn test_transact_update_scoped_system_parameters_prune() {
3444 use std::collections::{BTreeMap, BTreeSet};
3445
3446 use mz_catalog::memory::objects::{ClusterConfig, ClusterVariant};
3447 use mz_controller_types::ClusterId;
3448
3449 use crate::catalog::DropObjectInfo;
3450 use crate::config::{ScopedParameters, ScopedParametersScope};
3451
3452 Catalog::with_debug(|mut catalog| async move {
3453 let param = "max_query_result_size".to_string();
3454 let value = "1MB".to_string();
3455
3456 let commit_ts = catalog.current_upper().await;
3459 let cluster_a = catalog
3460 .allocate_user_cluster_id(commit_ts)
3461 .await
3462 .expect("allocate cluster A");
3463 let commit_ts = catalog.current_upper().await;
3464 let cluster_b = catalog
3465 .allocate_user_cluster_id(commit_ts)
3466 .await
3467 .expect("allocate cluster B");
3468
3469 let make_cluster_op = |id: ClusterId, name: &str| Op::CreateCluster {
3470 id,
3471 name: name.to_string(),
3472 introspection_sources: Vec::new(),
3473 owner_id: MZ_SYSTEM_ROLE_ID,
3474 config: ClusterConfig {
3475 variant: ClusterVariant::Unmanaged,
3476 workload_class: None,
3477 },
3478 };
3479
3480 let oracle_write_ts = catalog.current_upper().await;
3481 catalog
3482 .transact(
3483 None,
3484 oracle_write_ts,
3485 None,
3486 vec![
3487 make_cluster_op(cluster_a, "test_cluster_a"),
3488 make_cluster_op(cluster_b, "test_cluster_b"),
3489 ],
3490 )
3491 .await
3492 .expect("create clusters");
3493
3494 async fn rows(catalog: &Catalog) -> BTreeSet<(ClusterId, String, String)> {
3496 let mut storage = catalog.storage().await;
3497 let tx = storage.transaction().await.expect("open transaction");
3498 tx.get_cluster_system_configurations()
3499 .map(|c| (c.cluster_id, c.name, c.value))
3500 .collect()
3501 }
3502
3503 let scope_with = |ids: &[ClusterId]| ScopedParametersScope {
3504 clusters: ids.iter().copied().collect(),
3505 replicas: BTreeSet::new(),
3506 };
3507 let cluster_scoped = |id: ClusterId| {
3508 let mut cluster = BTreeMap::new();
3509 let mut overrides = BTreeMap::new();
3510 overrides.insert(param.clone(), value.clone());
3511 cluster.insert(id, overrides);
3512 ScopedParameters {
3513 cluster,
3514 replica: BTreeMap::new(),
3515 }
3516 };
3517 let empty_scoped = || ScopedParameters {
3518 cluster: BTreeMap::new(),
3519 replica: BTreeMap::new(),
3520 };
3521
3522 let oracle_write_ts = catalog.current_upper().await;
3524 catalog
3525 .transact(
3526 None,
3527 oracle_write_ts,
3528 None,
3529 vec![Op::UpdateScopedSystemParameters {
3530 scoped: cluster_scoped(cluster_a),
3531 prune_scope: Some(scope_with(&[cluster_a])),
3532 }],
3533 )
3534 .await
3535 .expect("upsert A");
3536 assert!(
3537 rows(&catalog)
3538 .await
3539 .contains(&(cluster_a, param.clone(), value.clone())),
3540 "upsert must create the row for A"
3541 );
3542
3543 let oracle_write_ts = catalog.current_upper().await;
3546 catalog
3547 .transact(
3548 None,
3549 oracle_write_ts,
3550 None,
3551 vec![Op::UpdateScopedSystemParameters {
3552 scoped: cluster_scoped(cluster_b),
3553 prune_scope: Some(scope_with(&[cluster_b])),
3554 }],
3555 )
3556 .await
3557 .expect("upsert B");
3558 assert!(
3559 rows(&catalog)
3560 .await
3561 .contains(&(cluster_b, param.clone(), value.clone())),
3562 "setup must create the row for B"
3563 );
3564
3565 let oracle_write_ts = catalog.current_upper().await;
3569 catalog
3570 .transact(
3571 None,
3572 oracle_write_ts,
3573 None,
3574 vec![Op::UpdateScopedSystemParameters {
3575 scoped: empty_scoped(),
3576 prune_scope: Some(scope_with(&[cluster_a])),
3577 }],
3578 )
3579 .await
3580 .expect("bounded prune");
3581 assert!(
3582 rows(&catalog)
3583 .await
3584 .contains(&(cluster_b, param.clone(), value.clone())),
3585 "a live cluster outside prune_scope must keep its row"
3586 );
3587
3588 let oracle_write_ts = catalog.current_upper().await;
3591 catalog
3592 .transact(
3593 None,
3594 oracle_write_ts,
3595 None,
3596 vec![Op::UpdateScopedSystemParameters {
3597 scoped: empty_scoped(),
3598 prune_scope: Some(scope_with(&[cluster_a])),
3599 }],
3600 )
3601 .await
3602 .expect("stale in-scope prune");
3603 assert!(
3604 !rows(&catalog)
3605 .await
3606 .iter()
3607 .any(|(id, _, _)| *id == cluster_a),
3608 "a live in-scope undesired row must be removed"
3609 );
3610
3611 let oracle_write_ts = catalog.current_upper().await;
3616 catalog
3617 .transact(
3618 None,
3619 oracle_write_ts,
3620 None,
3621 vec![Op::UpdateScopedSystemParameters {
3622 scoped: cluster_scoped(cluster_a),
3623 prune_scope: Some(scope_with(&[cluster_a])),
3624 }],
3625 )
3626 .await
3627 .expect("re-create A row");
3628 assert!(
3629 rows(&catalog)
3630 .await
3631 .contains(&(cluster_a, param.clone(), value.clone())),
3632 "re-created row for A must exist"
3633 );
3634
3635 let oracle_write_ts = catalog.current_upper().await;
3636 catalog
3637 .transact(
3638 None,
3639 oracle_write_ts,
3640 None,
3641 vec![Op::DropObjects(vec![DropObjectInfo::Cluster(cluster_a)])],
3642 )
3643 .await
3644 .expect("drop cluster A");
3645
3646 let oracle_write_ts = catalog.current_upper().await;
3649 catalog
3650 .transact(
3651 None,
3652 oracle_write_ts,
3653 None,
3654 vec![Op::UpdateScopedSystemParameters {
3655 scoped: cluster_scoped(cluster_b),
3656 prune_scope: Some(scope_with(&[cluster_b])),
3657 }],
3658 )
3659 .await
3660 .expect("orphan prune");
3661 assert!(
3662 !rows(&catalog)
3663 .await
3664 .iter()
3665 .any(|(id, _, _)| *id == cluster_a),
3666 "orphan row for a dropped cluster must be removed even when absent from prune_scope"
3667 );
3668
3669 catalog.expire().await;
3670 })
3671 .await
3672 }
3673}