1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52 RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72 is_reserved_role_name, object_type_to_audit_object_type,
73 system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82 AlterRetainHistory {
83 id: CatalogItemId,
84 value: Option<Value>,
85 window: CompactionWindow,
86 },
87 AlterSourceTimestampInterval {
88 id: CatalogItemId,
89 value: Option<Value>,
90 interval: Duration,
91 },
92 AlterRole {
93 id: RoleId,
94 name: String,
95 attributes: RoleAttributesRaw,
96 nopassword: bool,
97 vars: RoleVars,
98 },
99 AlterNetworkPolicy {
100 id: NetworkPolicyId,
101 rules: Vec<NetworkPolicyRule>,
102 name: String,
103 owner_id: RoleId,
104 },
105 AlterAddColumn {
106 id: CatalogItemId,
107 new_global_id: GlobalId,
108 name: ColumnName,
109 typ: SqlColumnType,
110 sql: RawDataType,
111 },
112 AlterMaterializedViewApplyReplacement {
113 id: CatalogItemId,
114 replacement_id: CatalogItemId,
115 },
116 CreateDatabase {
117 name: String,
118 owner_id: RoleId,
119 },
120 CreateSchema {
121 database_id: ResolvedDatabaseSpecifier,
122 schema_name: String,
123 owner_id: RoleId,
124 },
125 CreateRole {
126 name: String,
127 attributes: RoleAttributesRaw,
128 },
129 CreateCluster {
130 id: ClusterId,
131 name: String,
132 introspection_sources: Vec<&'static BuiltinLog>,
133 owner_id: RoleId,
134 config: ClusterConfig,
135 },
136 CreateClusterReplica {
137 cluster_id: ClusterId,
138 name: String,
139 config: ReplicaConfig,
140 owner_id: RoleId,
141 reason: ReplicaCreateDropReason,
142 },
143 CreateItem {
144 id: CatalogItemId,
145 name: QualifiedItemName,
146 item: CatalogItem,
147 owner_id: RoleId,
148 },
149 CreateNetworkPolicy {
150 rules: Vec<NetworkPolicyRule>,
151 name: String,
152 owner_id: RoleId,
153 },
154 Comment {
155 object_id: CommentObjectId,
156 sub_component: Option<usize>,
157 comment: Option<String>,
158 },
159 DropObjects(Vec<DropObjectInfo>),
160 GrantRole {
161 role_id: RoleId,
162 member_id: RoleId,
163 grantor_id: RoleId,
164 },
165 RenameCluster {
166 id: ClusterId,
167 name: String,
168 to_name: String,
169 check_reserved_names: bool,
170 },
171 RenameClusterReplica {
172 cluster_id: ClusterId,
173 replica_id: ReplicaId,
174 name: QualifiedReplica,
175 to_name: String,
176 },
177 RenameItem {
178 id: CatalogItemId,
179 current_full_name: FullItemName,
180 to_name: String,
181 },
182 RenameSchema {
183 database_spec: ResolvedDatabaseSpecifier,
184 schema_spec: SchemaSpecifier,
185 new_name: String,
186 check_reserved_names: bool,
187 },
188 UpdateOwner {
189 id: ObjectId,
190 new_owner: RoleId,
191 },
192 UpdatePrivilege {
193 target_id: SystemObjectId,
194 privilege: MzAclItem,
195 variant: UpdatePrivilegeVariant,
196 },
197 UpdateDefaultPrivilege {
198 privilege_object: DefaultPrivilegeObject,
199 privilege_acl_item: DefaultPrivilegeAclItem,
200 variant: UpdatePrivilegeVariant,
201 },
202 RevokeRole {
203 role_id: RoleId,
204 member_id: RoleId,
205 grantor_id: RoleId,
206 },
207 UpdateClusterConfig {
208 id: ClusterId,
209 name: String,
210 config: ClusterConfig,
211 },
212 UpdateClusterReplicaConfig {
213 cluster_id: ClusterId,
214 replica_id: ReplicaId,
215 config: ReplicaConfig,
216 },
217 UpdateItem {
218 id: CatalogItemId,
219 name: QualifiedItemName,
220 to_item: CatalogItem,
221 },
222 UpdateSourceReferences {
223 source_id: CatalogItemId,
224 references: SourceReferences,
225 },
226 UpdateSystemConfiguration {
227 name: String,
228 value: OwnedVarInput,
229 },
230 ResetSystemConfiguration {
231 name: String,
232 },
233 ResetAllSystemConfiguration,
234 WeirdStorageUsageUpdates {
241 object_id: Option<String>,
242 size_bytes: u64,
243 collection_timestamp: EpochMillis,
244 },
245 TransactionDryRun,
251}
252
253#[derive(Debug, Clone)]
257pub enum DropObjectInfo {
258 Cluster(ClusterId),
259 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
260 Database(DatabaseId),
261 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
262 Role(RoleId),
263 Item(CatalogItemId),
264 NetworkPolicy(NetworkPolicyId),
265}
266
267impl DropObjectInfo {
268 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
271 match id {
272 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
273 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
274 cluster_id,
275 replica_id,
276 ReplicaCreateDropReason::Manual,
277 )),
278 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
279 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
280 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
281 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
282 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
283 }
284 }
285
286 fn to_object_id(&self) -> ObjectId {
289 match &self {
290 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
291 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
292 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
293 }
294 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
295 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
296 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
297 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
298 DropObjectInfo::NetworkPolicy(network_policy_id) => {
299 ObjectId::NetworkPolicy(network_policy_id.clone())
300 }
301 }
302 }
303}
304
305#[derive(Debug, Clone)]
307pub enum ReplicaCreateDropReason {
308 Manual,
313 ClusterScheduling(Vec<SchedulingDecision>),
316}
317
318impl ReplicaCreateDropReason {
319 pub fn into_audit_log(
320 self,
321 ) -> (
322 CreateOrDropClusterReplicaReasonV1,
323 Option<SchedulingDecisionsWithReasonsV2>,
324 ) {
325 let (reason, scheduling_policies) = match self {
326 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
327 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
328 CreateOrDropClusterReplicaReasonV1::Schedule,
329 Some(scheduling_decisions),
330 ),
331 };
332 (
333 reason,
334 scheduling_policies
335 .as_ref()
336 .map(SchedulingDecision::reasons_to_audit_log_reasons),
337 )
338 }
339}
340
341pub struct TransactionResult {
342 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
343 pub catalog_updates: Vec<ParsedStateUpdate>,
345 pub audit_events: Vec<VersionedEvent>,
346}
347
348impl Catalog {
349 fn should_audit_log_item(item: &CatalogItem) -> bool {
350 !item.is_temporary()
351 }
352
353 fn temporary_ids(
356 &self,
357 ops: &[Op],
358 temporary_drops: BTreeSet<(&ConnectionId, String)>,
359 ) -> Result<BTreeSet<CatalogItemId>, Error> {
360 let mut creating = BTreeSet::new();
361 let mut temporary_ids = BTreeSet::new();
362 for op in ops.iter() {
363 if let Op::CreateItem {
364 id,
365 name,
366 item,
367 owner_id: _,
368 } = op
369 {
370 if let Some(conn_id) = item.conn_id() {
371 if self.item_exists_in_temp_schemas(conn_id, &name.item)
372 && !temporary_drops.contains(&(conn_id, name.item.clone()))
373 || creating.contains(&(conn_id, &name.item))
374 {
375 return Err(
376 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
377 );
378 } else {
379 creating.insert((conn_id, &name.item));
380 temporary_ids.insert(id.clone());
381 }
382 }
383 }
384 }
385 Ok(temporary_ids)
386 }
387
388 #[instrument(name = "catalog::transact")]
389 pub async fn transact(
390 &mut self,
391 storage_collections: Option<
394 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
395 >,
396 oracle_write_ts: mz_repr::Timestamp,
397 session: Option<&ConnMeta>,
398 ops: Vec<Op>,
399 ) -> Result<TransactionResult, AdapterError> {
400 trace!("transact: {:?}", ops);
401 fail::fail_point!("catalog_transact", |arg| {
402 Err(AdapterError::Unstructured(anyhow::anyhow!(
403 "failpoint: {arg:?}"
404 )))
405 });
406
407 let drop_ids: BTreeSet<CatalogItemId> = ops
408 .iter()
409 .filter_map(|op| match op {
410 Op::DropObjects(drop_object_infos) => {
411 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
412 let item_ids = ids.filter_map(|id| match id {
413 ObjectId::Item(id) => Some(id),
414 _ => None,
415 });
416 Some(item_ids)
417 }
418 _ => None,
419 })
420 .flatten()
421 .collect();
422 let temporary_drops = drop_ids
423 .iter()
424 .filter_map(|id| {
425 let entry = self.get_entry(id);
426 match entry.item().conn_id() {
427 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
428 None => None,
429 }
430 })
431 .collect();
432 let dropped_global_ids = drop_ids
433 .iter()
434 .flat_map(|item_id| self.get_global_ids(item_id))
435 .collect();
436
437 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
438 let mut builtin_table_updates = vec![];
439 let mut catalog_updates = vec![];
440 let mut audit_events = vec![];
441 let mut storage = self.storage().await;
442 let mut tx = storage
443 .transaction()
444 .await
445 .unwrap_or_terminate("starting catalog transaction");
446
447 let new_state = Self::transact_inner(
448 storage_collections,
449 oracle_write_ts,
450 session,
451 ops,
452 temporary_ids,
453 &mut builtin_table_updates,
454 &mut catalog_updates,
455 &mut audit_events,
456 &mut tx,
457 &self.state,
458 )
459 .await?;
460
461 tx.commit(oracle_write_ts)
466 .await
467 .unwrap_or_terminate("catalog storage transaction commit must succeed");
468
469 drop(storage);
472 if let Some(new_state) = new_state {
473 self.transient_revision += 1;
474 self.state = new_state;
475 }
476
477 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
479 if self.state.system_config().enable_mz_notices() {
480 self.state().pack_optimizer_notices(
482 &mut builtin_table_updates,
483 dropped_notices.iter(),
484 Diff::MINUS_ONE,
485 );
486 }
487
488 Ok(TransactionResult {
489 builtin_table_updates,
490 catalog_updates,
491 audit_events,
492 })
493 }
494
495 fn extract_expressions_from_ops(
499 ops: &[Op],
500 optimizer_features: &OptimizerFeatures,
501 ) -> BTreeMap<GlobalId, LocalExpressions> {
502 let mut exprs = BTreeMap::new();
503
504 for op in ops {
505 if let Op::CreateItem { item, .. } = op {
506 match item {
507 CatalogItem::View(view) => {
508 exprs.insert(
509 view.global_id,
510 LocalExpressions {
511 local_mir: (*view.optimized_expr).clone(),
512 optimizer_features: optimizer_features.clone(),
513 },
514 );
515 }
516 CatalogItem::MaterializedView(mv) => {
517 exprs.insert(
518 mv.global_id_writes(),
519 LocalExpressions {
520 local_mir: (*mv.optimized_expr).clone(),
521 optimizer_features: optimizer_features.clone(),
522 },
523 );
524 }
525 _ => {}
526 }
527 }
528 }
529
530 exprs
531 }
532
533 #[instrument(name = "catalog::transact_inner")]
541 async fn transact_inner(
542 storage_collections: Option<
543 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
544 >,
545 oracle_write_ts: mz_repr::Timestamp,
546 session: Option<&ConnMeta>,
547 mut ops: Vec<Op>,
548 temporary_ids: BTreeSet<CatalogItemId>,
549 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
550 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
551 audit_events: &mut Vec<VersionedEvent>,
552 tx: &mut Transaction<'_>,
553 state: &CatalogState,
554 ) -> Result<Option<CatalogState>, AdapterError> {
555 let mut preliminary_state = Cow::Borrowed(state);
582
583 let mut state = Cow::Borrowed(state);
585
586 let dry_run_ops = match ops.last() {
587 Some(Op::TransactionDryRun) => {
588 ops.pop();
590 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
591 ops.clone()
592 }
593 Some(_) => vec![],
594 None => return Ok(None),
595 };
596
597 let optimizer_features = OptimizerFeatures::from(state.system_config());
600 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
601
602 let mut storage_collections_to_create = BTreeSet::new();
603 let mut storage_collections_to_drop = BTreeSet::new();
604 let mut storage_collections_to_register = BTreeMap::new();
605
606 let mut updates = Vec::new();
607
608 for op in ops {
609 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
610 oracle_write_ts,
611 session,
612 op,
613 &temporary_ids,
614 audit_events,
615 tx,
616 &*preliminary_state,
617 &mut storage_collections_to_create,
618 &mut storage_collections_to_drop,
619 &mut storage_collections_to_register,
620 )
621 .await?;
622
623 if let Some(builtin_table_update) = weird_builtin_table_update {
632 builtin_table_updates.push(builtin_table_update);
633 }
634
635 let upper = tx.upper();
640 let temporary_item_updates =
641 temporary_item_updates
642 .into_iter()
643 .map(|(item, diff)| StateUpdate {
644 kind: StateUpdateKind::TemporaryItem(item),
645 ts: upper,
646 diff,
647 });
648
649 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
650 op_updates.extend(temporary_item_updates);
651 if !op_updates.is_empty() {
652 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
655 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
656 .to_mut()
657 .apply_updates(op_updates.clone(), &mut local_expr_cache)
658 .await;
659 }
660 updates.append(&mut op_updates);
661 }
662
663 if !updates.is_empty() {
664 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
665 let (op_builtin_table_updates, op_catalog_updates) = state
666 .to_mut()
667 .apply_updates(updates.clone(), &mut local_expr_cache)
668 .await;
669 let op_builtin_table_updates = state
670 .to_mut()
671 .resolve_builtin_table_updates(op_builtin_table_updates);
672 builtin_table_updates.extend(op_builtin_table_updates);
673 parsed_catalog_updates.extend(op_catalog_updates);
674 }
675
676 if dry_run_ops.is_empty() {
677 if let Some(c) = storage_collections {
679 c.prepare_state(
680 tx,
681 storage_collections_to_create,
682 storage_collections_to_drop,
683 storage_collections_to_register,
684 )
685 .await?;
686 }
687
688 let updates = tx.get_and_commit_op_updates();
689 if !updates.is_empty() {
690 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
691 let (op_builtin_table_updates, op_catalog_updates) = state
692 .to_mut()
693 .apply_updates(updates.clone(), &mut local_expr_cache)
694 .await;
695 let op_builtin_table_updates = state
696 .to_mut()
697 .resolve_builtin_table_updates(op_builtin_table_updates);
698 builtin_table_updates.extend(op_builtin_table_updates);
699 parsed_catalog_updates.extend(op_catalog_updates);
700 }
701
702 match state {
703 Cow::Owned(state) => Ok(Some(state)),
704 Cow::Borrowed(_) => Ok(None),
705 }
706 } else {
707 Err(AdapterError::TransactionDryRun {
708 new_ops: dry_run_ops,
709 new_state: state.into_owned(),
710 })
711 }
712 }
713
714 #[instrument]
722 async fn transact_op(
723 oracle_write_ts: mz_repr::Timestamp,
724 session: Option<&ConnMeta>,
725 op: Op,
726 temporary_ids: &BTreeSet<CatalogItemId>,
727 audit_events: &mut Vec<VersionedEvent>,
728 tx: &mut Transaction<'_>,
729 state: &CatalogState,
730 storage_collections_to_create: &mut BTreeSet<GlobalId>,
731 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
732 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
733 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
734 let mut weird_builtin_table_update = None;
735 let mut temporary_item_updates = Vec::new();
736
737 match op {
738 Op::TransactionDryRun => {
739 unreachable!("TransactionDryRun can only be used a final element of ops")
740 }
741 Op::AlterRetainHistory { id, value, window } => {
742 let entry = state.get_entry(&id);
743 if id.is_system() {
744 let name = entry.name();
745 let full_name =
746 state.resolve_full_name(name, session.map(|session| session.conn_id()));
747 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
748 full_name.to_string(),
749 ))));
750 }
751
752 let mut new_entry = entry.clone();
753 let previous = new_entry
754 .item
755 .update_retain_history(value.clone(), window)
756 .map_err(|_| {
757 AdapterError::Catalog(Error::new(ErrorKind::Internal(
758 "planner should have rejected invalid alter retain history item type"
759 .to_string(),
760 )))
761 })?;
762
763 if Self::should_audit_log_item(new_entry.item()) {
764 let details =
765 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
766 id: id.to_string(),
767 old_history: previous.map(|previous| previous.to_string()),
768 new_history: value.map(|v| v.to_string()),
769 });
770 CatalogState::add_to_audit_log(
771 &state.system_configuration,
772 oracle_write_ts,
773 session,
774 tx,
775 audit_events,
776 EventType::Alter,
777 catalog_type_to_audit_object_type(new_entry.item().typ()),
778 details,
779 )?;
780 }
781
782 tx.update_item(id, new_entry.into())?;
783
784 Self::log_update(state, &id);
785 }
786 Op::AlterSourceTimestampInterval {
787 id,
788 value,
789 interval,
790 } => {
791 let entry = state.get_entry(&id);
792 if id.is_system() {
793 let name = entry.name();
794 let full_name =
795 state.resolve_full_name(name, session.map(|session| session.conn_id()));
796 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
797 full_name.to_string(),
798 ))));
799 }
800
801 let mut new_entry = entry.clone();
802 new_entry
803 .item
804 .update_timestamp_interval(value, interval)
805 .map_err(|_| {
806 AdapterError::Catalog(Error::new(ErrorKind::Internal(
807 "planner should have rejected invalid alter timestamp interval item type"
808 .to_string(),
809 )))
810 })?;
811
812 tx.update_item(id, new_entry.into())?;
813
814 Self::log_update(state, &id);
815 }
816 Op::AlterRole {
817 id,
818 name,
819 attributes,
820 nopassword,
821 vars,
822 } => {
823 state.ensure_not_reserved_role(&id)?;
824
825 let mut existing_role = state.get_role(&id).clone();
826 let password = attributes.password.clone();
827 let scram_iterations = attributes
828 .scram_iterations
829 .unwrap_or_else(|| state.system_config().scram_iterations());
830 existing_role.attributes = attributes.into();
831 existing_role.vars = vars;
832 let password_action = if nopassword {
833 PasswordAction::Clear
834 } else if let Some(password) = password {
835 PasswordAction::Set(PasswordConfig {
836 password,
837 scram_iterations,
838 })
839 } else {
840 PasswordAction::NoChange
841 };
842 tx.update_role(id, existing_role.into(), password_action)?;
843
844 CatalogState::add_to_audit_log(
845 &state.system_configuration,
846 oracle_write_ts,
847 session,
848 tx,
849 audit_events,
850 EventType::Alter,
851 ObjectType::Role,
852 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
853 id: id.to_string(),
854 name: name.clone(),
855 }),
856 )?;
857
858 info!("update role {name} ({id})");
859 }
860 Op::AlterNetworkPolicy {
861 id,
862 rules,
863 name,
864 owner_id: _owner_id,
865 } => {
866 let existing_policy = state.get_network_policy(&id).clone();
867 let mut policy: NetworkPolicy = existing_policy.into();
868 policy.rules = rules;
869 if is_reserved_name(&name) {
870 return Err(AdapterError::Catalog(Error::new(
871 ErrorKind::ReservedNetworkPolicyName(name),
872 )));
873 }
874 tx.update_network_policy(id, policy.clone())?;
875
876 CatalogState::add_to_audit_log(
877 &state.system_configuration,
878 oracle_write_ts,
879 session,
880 tx,
881 audit_events,
882 EventType::Alter,
883 ObjectType::NetworkPolicy,
884 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
885 id: id.to_string(),
886 name: name.clone(),
887 }),
888 )?;
889
890 info!("update network policy {name} ({id})");
891 }
892 Op::AlterAddColumn {
893 id,
894 new_global_id,
895 name,
896 typ,
897 sql,
898 } => {
899 let mut new_entry = state.get_entry(&id).clone();
900 let version = new_entry.item.add_column(name, typ, sql)?;
901 let shard_id = state
904 .storage_metadata()
905 .get_collection_shard(new_entry.latest_global_id())?;
906
907 let CatalogItem::Table(table) = &mut new_entry.item else {
909 return Err(AdapterError::Unsupported("adding columns to non-Table"));
910 };
911 table.collections.insert(version, new_global_id);
912
913 tx.update_item(id, new_entry.into())?;
914 storage_collections_to_register.insert(new_global_id, shard_id);
915 }
916 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
917 let mut new_entry = state.get_entry(&id).clone();
918 let replacement = state.get_entry(&replacement_id);
919
920 let new_audit_events =
921 apply_replacement_audit_events(state, &new_entry, replacement);
922
923 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
924 return Err(AdapterError::internal(
925 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
926 "id must refer to a materialized view",
927 ));
928 };
929 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
930 return Err(AdapterError::internal(
931 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
932 "replacement_id must refer to a materialized view",
933 ));
934 };
935
936 mv.apply_replacement(replacement_mv.clone());
937
938 tx.remove_item(replacement_id)?;
939
940 new_entry.id = replacement_id;
941 tx_replace_item(tx, state, id, new_entry)?;
942
943 let comment_id = CommentObjectId::MaterializedView(replacement_id);
944 tx.drop_comments(&[comment_id].into())?;
945
946 for (event_type, details) in new_audit_events {
947 CatalogState::add_to_audit_log(
948 &state.system_configuration,
949 oracle_write_ts,
950 session,
951 tx,
952 audit_events,
953 event_type,
954 ObjectType::MaterializedView,
955 details,
956 )?;
957 }
958 }
959 Op::CreateDatabase { name, owner_id } => {
960 let database_owner_privileges = vec![rbac::owner_privilege(
961 mz_sql::catalog::ObjectType::Database,
962 owner_id,
963 )];
964 let database_default_privileges = state
965 .default_privileges
966 .get_applicable_privileges(
967 owner_id,
968 None,
969 None,
970 mz_sql::catalog::ObjectType::Database,
971 )
972 .map(|item| item.mz_acl_item(owner_id));
973 let database_privileges: Vec<_> = merge_mz_acl_items(
974 database_owner_privileges
975 .into_iter()
976 .chain(database_default_privileges),
977 )
978 .collect();
979
980 let schema_owner_privileges = vec![rbac::owner_privilege(
981 mz_sql::catalog::ObjectType::Schema,
982 owner_id,
983 )];
984 let schema_default_privileges = state
985 .default_privileges
986 .get_applicable_privileges(
987 owner_id,
988 None,
989 None,
990 mz_sql::catalog::ObjectType::Schema,
991 )
992 .map(|item| item.mz_acl_item(owner_id))
993 .chain(std::iter::once(MzAclItem {
995 grantee: RoleId::Public,
996 grantor: owner_id,
997 acl_mode: AclMode::USAGE,
998 }));
999 let schema_privileges: Vec<_> = merge_mz_acl_items(
1000 schema_owner_privileges
1001 .into_iter()
1002 .chain(schema_default_privileges),
1003 )
1004 .collect();
1005
1006 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1007 let (database_id, _) = tx.insert_user_database(
1008 &name,
1009 owner_id,
1010 database_privileges.clone(),
1011 &temporary_oids,
1012 )?;
1013 let (schema_id, _) = tx.insert_user_schema(
1014 database_id,
1015 DEFAULT_SCHEMA,
1016 owner_id,
1017 schema_privileges.clone(),
1018 &temporary_oids,
1019 )?;
1020 CatalogState::add_to_audit_log(
1021 &state.system_configuration,
1022 oracle_write_ts,
1023 session,
1024 tx,
1025 audit_events,
1026 EventType::Create,
1027 ObjectType::Database,
1028 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1029 id: database_id.to_string(),
1030 name: name.clone(),
1031 }),
1032 )?;
1033 info!("create database {}", name);
1034
1035 CatalogState::add_to_audit_log(
1036 &state.system_configuration,
1037 oracle_write_ts,
1038 session,
1039 tx,
1040 audit_events,
1041 EventType::Create,
1042 ObjectType::Schema,
1043 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1044 id: schema_id.to_string(),
1045 name: DEFAULT_SCHEMA.to_string(),
1046 database_name: Some(name),
1047 }),
1048 )?;
1049 }
1050 Op::CreateSchema {
1051 database_id,
1052 schema_name,
1053 owner_id,
1054 } => {
1055 if is_reserved_name(&schema_name) {
1056 return Err(AdapterError::Catalog(Error::new(
1057 ErrorKind::ReservedSchemaName(schema_name),
1058 )));
1059 }
1060 let database_id = match database_id {
1061 ResolvedDatabaseSpecifier::Id(id) => id,
1062 ResolvedDatabaseSpecifier::Ambient => {
1063 return Err(AdapterError::Catalog(Error::new(
1064 ErrorKind::ReadOnlySystemSchema(schema_name),
1065 )));
1066 }
1067 };
1068 let owner_privileges = vec![rbac::owner_privilege(
1069 mz_sql::catalog::ObjectType::Schema,
1070 owner_id,
1071 )];
1072 let default_privileges = state
1073 .default_privileges
1074 .get_applicable_privileges(
1075 owner_id,
1076 Some(database_id),
1077 None,
1078 mz_sql::catalog::ObjectType::Schema,
1079 )
1080 .map(|item| item.mz_acl_item(owner_id));
1081 let privileges: Vec<_> =
1082 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1083 .collect();
1084 let (schema_id, _) = tx.insert_user_schema(
1085 database_id,
1086 &schema_name,
1087 owner_id,
1088 privileges.clone(),
1089 &state.get_temporary_oids().collect(),
1090 )?;
1091 CatalogState::add_to_audit_log(
1092 &state.system_configuration,
1093 oracle_write_ts,
1094 session,
1095 tx,
1096 audit_events,
1097 EventType::Create,
1098 ObjectType::Schema,
1099 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1100 id: schema_id.to_string(),
1101 name: schema_name.clone(),
1102 database_name: Some(state.database_by_id[&database_id].name.clone()),
1103 }),
1104 )?;
1105 }
1106 Op::CreateRole { name, attributes } => {
1107 if is_reserved_role_name(&name) {
1108 return Err(AdapterError::Catalog(Error::new(
1109 ErrorKind::ReservedRoleName(name),
1110 )));
1111 }
1112 let membership = RoleMembership::new();
1113 let vars = RoleVars::default();
1114 let (id, _) = tx.insert_user_role(
1115 name.clone(),
1116 attributes.clone(),
1117 membership.clone(),
1118 vars.clone(),
1119 &state.get_temporary_oids().collect(),
1120 )?;
1121 CatalogState::add_to_audit_log(
1122 &state.system_configuration,
1123 oracle_write_ts,
1124 session,
1125 tx,
1126 audit_events,
1127 EventType::Create,
1128 ObjectType::Role,
1129 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1130 id: id.to_string(),
1131 name: name.clone(),
1132 }),
1133 )?;
1134 info!("create role {}", name);
1135 }
1136 Op::CreateCluster {
1137 id,
1138 name,
1139 introspection_sources,
1140 owner_id,
1141 config,
1142 } => {
1143 if is_reserved_name(&name) {
1144 return Err(AdapterError::Catalog(Error::new(
1145 ErrorKind::ReservedClusterName(name),
1146 )));
1147 }
1148 let owner_privileges = vec![rbac::owner_privilege(
1149 mz_sql::catalog::ObjectType::Cluster,
1150 owner_id,
1151 )];
1152 let default_privileges = state
1153 .default_privileges
1154 .get_applicable_privileges(
1155 owner_id,
1156 None,
1157 None,
1158 mz_sql::catalog::ObjectType::Cluster,
1159 )
1160 .map(|item| item.mz_acl_item(owner_id));
1161 let privileges: Vec<_> =
1162 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1163 .collect();
1164 let introspection_source_ids: Vec<_> = introspection_sources
1165 .iter()
1166 .map(|introspection_source| {
1167 Transaction::allocate_introspection_source_index_id(
1168 &id,
1169 introspection_source.variant,
1170 )
1171 })
1172 .collect();
1173
1174 let introspection_sources = introspection_sources
1175 .into_iter()
1176 .zip_eq(introspection_source_ids)
1177 .map(|(log, (item_id, gid))| (log, item_id, gid))
1178 .collect();
1179
1180 tx.insert_user_cluster(
1181 id,
1182 &name,
1183 introspection_sources,
1184 owner_id,
1185 privileges.clone(),
1186 config.clone().into(),
1187 &state.get_temporary_oids().collect(),
1188 )?;
1189 CatalogState::add_to_audit_log(
1190 &state.system_configuration,
1191 oracle_write_ts,
1192 session,
1193 tx,
1194 audit_events,
1195 EventType::Create,
1196 ObjectType::Cluster,
1197 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1198 id: id.to_string(),
1199 name: name.clone(),
1200 }),
1201 )?;
1202 info!("create cluster {}", name);
1203 }
1204 Op::CreateClusterReplica {
1205 cluster_id,
1206 name,
1207 config,
1208 owner_id,
1209 reason,
1210 } => {
1211 if is_reserved_name(&name) {
1212 return Err(AdapterError::Catalog(Error::new(
1213 ErrorKind::ReservedReplicaName(name),
1214 )));
1215 }
1216 let cluster = state.get_cluster(cluster_id);
1217 let id =
1218 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1219 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1220 size,
1221 billed_as,
1222 internal,
1223 ..
1224 }) = &config.location
1225 {
1226 let (reason, scheduling_policies) = reason.into_audit_log();
1227 let details = EventDetails::CreateClusterReplicaV4(
1228 mz_audit_log::CreateClusterReplicaV4 {
1229 cluster_id: cluster_id.to_string(),
1230 cluster_name: cluster.name.clone(),
1231 replica_id: Some(id.to_string()),
1232 replica_name: name.clone(),
1233 logical_size: size.clone(),
1234 billed_as: billed_as.clone(),
1235 internal: *internal,
1236 reason,
1237 scheduling_policies,
1238 },
1239 );
1240 CatalogState::add_to_audit_log(
1241 &state.system_configuration,
1242 oracle_write_ts,
1243 session,
1244 tx,
1245 audit_events,
1246 EventType::Create,
1247 ObjectType::ClusterReplica,
1248 details,
1249 )?;
1250 }
1251 }
1252 Op::CreateItem {
1253 id,
1254 name,
1255 item,
1256 owner_id,
1257 } => {
1258 state.check_unstable_dependencies(&item)?;
1259
1260 match &item {
1261 CatalogItem::Table(table) => {
1262 let gids: Vec<_> = table.global_ids().collect();
1263 assert_eq!(gids.len(), 1);
1264 storage_collections_to_create.extend(gids);
1265 }
1266 CatalogItem::Source(source) => {
1267 storage_collections_to_create.insert(source.global_id());
1268 }
1269 CatalogItem::MaterializedView(mv) => {
1270 let mv_gid = mv.global_id_writes();
1271 if let Some(target_id) = mv.replacement_target {
1272 let target_gid = state.get_entry(&target_id).latest_global_id();
1273 let shard_id =
1274 state.storage_metadata().get_collection_shard(target_gid)?;
1275 storage_collections_to_register.insert(mv_gid, shard_id);
1276 } else {
1277 storage_collections_to_create.insert(mv_gid);
1278 }
1279 }
1280 CatalogItem::ContinualTask(ct) => {
1281 storage_collections_to_create.insert(ct.global_id());
1282 }
1283 CatalogItem::Sink(sink) => {
1284 storage_collections_to_create.insert(sink.global_id());
1285 }
1286 CatalogItem::Log(_)
1287 | CatalogItem::View(_)
1288 | CatalogItem::Index(_)
1289 | CatalogItem::Type(_)
1290 | CatalogItem::Func(_)
1291 | CatalogItem::Secret(_)
1292 | CatalogItem::Connection(_) => (),
1293 }
1294
1295 let system_user = session.map_or(false, |s| s.user().is_system_user());
1296 if !system_user {
1297 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1298 let cluster_name = state.clusters_by_id[&id].name.clone();
1299 return Err(AdapterError::Catalog(Error::new(
1300 ErrorKind::ReadOnlyCluster(cluster_name),
1301 )));
1302 }
1303 }
1304
1305 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1306 let default_privileges = state
1307 .default_privileges
1308 .get_applicable_privileges(
1309 owner_id,
1310 name.qualifiers.database_spec.id(),
1311 Some(name.qualifiers.schema_spec.into()),
1312 item.typ().into(),
1313 )
1314 .map(|item| item.mz_acl_item(owner_id));
1315 let progress_source_privilege = if item.is_progress_source() {
1317 Some(MzAclItem {
1318 grantee: MZ_SUPPORT_ROLE_ID,
1319 grantor: owner_id,
1320 acl_mode: AclMode::SELECT,
1321 })
1322 } else {
1323 None
1324 };
1325 let privileges: Vec<_> = merge_mz_acl_items(
1326 owner_privileges
1327 .into_iter()
1328 .chain(default_privileges)
1329 .chain(progress_source_privilege),
1330 )
1331 .collect();
1332
1333 let temporary_oids = state.get_temporary_oids().collect();
1334
1335 if item.is_temporary() {
1336 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1337 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1338 {
1339 return Err(AdapterError::Catalog(Error::new(
1340 ErrorKind::InvalidTemporarySchema,
1341 )));
1342 }
1343 let oid = tx.allocate_oid(&temporary_oids)?;
1344
1345 let schema_id = name.qualifiers.schema_spec.clone().into();
1346 let item_type = item.typ();
1347 let (create_sql, global_id, versions) = item.to_serialized();
1348
1349 let item = TemporaryItem {
1350 id,
1351 oid,
1352 global_id,
1353 schema_id,
1354 name: name.item.clone(),
1355 create_sql,
1356 conn_id: item.conn_id().cloned(),
1357 owner_id,
1358 privileges: privileges.clone(),
1359 extra_versions: versions,
1360 };
1361 temporary_item_updates.push((item, StateDiff::Addition));
1362
1363 info!(
1364 "create temporary {} {} ({})",
1365 item_type,
1366 state.resolve_full_name(&name, None),
1367 id
1368 );
1369 } else {
1370 if let Some(temp_id) =
1371 item.uses()
1372 .iter()
1373 .find(|id| match state.try_get_entry(*id) {
1374 Some(entry) => entry.item().is_temporary(),
1375 None => temporary_ids.contains(id),
1376 })
1377 {
1378 let temp_item = state.get_entry(temp_id);
1379 return Err(AdapterError::Catalog(Error::new(
1380 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1381 )));
1382 }
1383 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1384 && !system_user
1385 {
1386 let schema_name = state
1387 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1388 .schema;
1389 return Err(AdapterError::Catalog(Error::new(
1390 ErrorKind::ReadOnlySystemSchema(schema_name),
1391 )));
1392 }
1393 let schema_id = name.qualifiers.schema_spec.clone().into();
1394 let item_type = item.typ();
1395 let (create_sql, global_id, versions) = item.to_serialized();
1396 tx.insert_user_item(
1397 id,
1398 global_id,
1399 schema_id,
1400 &name.item,
1401 create_sql,
1402 owner_id,
1403 privileges.clone(),
1404 &temporary_oids,
1405 versions,
1406 )?;
1407 info!(
1408 "create {} {} ({})",
1409 item_type,
1410 state.resolve_full_name(&name, None),
1411 id
1412 );
1413 }
1414
1415 if Self::should_audit_log_item(&item) {
1416 let name = Self::full_name_detail(
1417 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1418 );
1419 let details = match &item {
1420 CatalogItem::Source(s) => {
1421 let cluster_id = match s.data_source {
1422 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1425 match state.get_entry(&ingestion_id).cluster_id() {
1426 Some(cluster_id) => Some(cluster_id.to_string()),
1427 None => None,
1428 }
1429 }
1430 _ => match item.cluster_id() {
1431 Some(cluster_id) => Some(cluster_id.to_string()),
1432 None => None,
1433 },
1434 };
1435
1436 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1437 id: id.to_string(),
1438 cluster_id,
1439 name,
1440 external_type: s.source_type().to_string(),
1441 })
1442 }
1443 CatalogItem::Sink(s) => {
1444 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1445 id: id.to_string(),
1446 cluster_id: Some(s.cluster_id.to_string()),
1447 name,
1448 external_type: s.sink_type().to_string(),
1449 })
1450 }
1451 CatalogItem::Index(i) => {
1452 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1453 id: id.to_string(),
1454 name,
1455 cluster_id: i.cluster_id.to_string(),
1456 })
1457 }
1458 CatalogItem::MaterializedView(mv) => {
1459 EventDetails::CreateMaterializedViewV1(
1460 mz_audit_log::CreateMaterializedViewV1 {
1461 id: id.to_string(),
1462 name,
1463 cluster_id: mv.cluster_id.to_string(),
1464 replacement_target_id: mv
1465 .replacement_target
1466 .map(|id| id.to_string()),
1467 },
1468 )
1469 }
1470 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1471 id: id.to_string(),
1472 name,
1473 }),
1474 };
1475 CatalogState::add_to_audit_log(
1476 &state.system_configuration,
1477 oracle_write_ts,
1478 session,
1479 tx,
1480 audit_events,
1481 EventType::Create,
1482 catalog_type_to_audit_object_type(item.typ()),
1483 details,
1484 )?;
1485 }
1486 }
1487 Op::CreateNetworkPolicy {
1488 rules,
1489 name,
1490 owner_id,
1491 } => {
1492 if state.network_policies_by_name.contains_key(&name) {
1493 return Err(AdapterError::PlanError(PlanError::Catalog(
1494 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1495 )));
1496 }
1497 if is_reserved_name(&name) {
1498 return Err(AdapterError::Catalog(Error::new(
1499 ErrorKind::ReservedNetworkPolicyName(name),
1500 )));
1501 }
1502
1503 let owner_privileges = vec![rbac::owner_privilege(
1504 mz_sql::catalog::ObjectType::NetworkPolicy,
1505 owner_id,
1506 )];
1507 let default_privileges = state
1508 .default_privileges
1509 .get_applicable_privileges(
1510 owner_id,
1511 None,
1512 None,
1513 mz_sql::catalog::ObjectType::NetworkPolicy,
1514 )
1515 .map(|item| item.mz_acl_item(owner_id));
1516 let privileges: Vec<_> =
1517 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1518 .collect();
1519
1520 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1521 let id = tx.insert_user_network_policy(
1522 name.clone(),
1523 rules,
1524 privileges,
1525 owner_id,
1526 &temporary_oids,
1527 )?;
1528
1529 CatalogState::add_to_audit_log(
1530 &state.system_configuration,
1531 oracle_write_ts,
1532 session,
1533 tx,
1534 audit_events,
1535 EventType::Create,
1536 ObjectType::NetworkPolicy,
1537 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1538 id: id.to_string(),
1539 name: name.clone(),
1540 }),
1541 )?;
1542
1543 info!("created network policy {name} ({id})");
1544 }
1545 Op::Comment {
1546 object_id,
1547 sub_component,
1548 comment,
1549 } => {
1550 tx.update_comment(object_id, sub_component, comment)?;
1551 let entry = state.get_comment_id_entry(&object_id);
1552 let should_log = entry
1553 .map(|entry| Self::should_audit_log_item(entry.item()))
1554 .unwrap_or(true);
1556 if let (Some(conn_id), true) =
1559 (session.map(|session| session.conn_id()), should_log)
1560 {
1561 CatalogState::add_to_audit_log(
1562 &state.system_configuration,
1563 oracle_write_ts,
1564 session,
1565 tx,
1566 audit_events,
1567 EventType::Comment,
1568 comment_id_to_audit_object_type(object_id),
1569 EventDetails::IdNameV1(IdNameV1 {
1570 id: format!("{object_id:?}"),
1572 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1573 }),
1574 )?;
1575 }
1576 }
1577 Op::UpdateSourceReferences {
1578 source_id,
1579 references,
1580 } => {
1581 tx.update_source_references(
1582 source_id,
1583 references
1584 .references
1585 .into_iter()
1586 .map(|reference| reference.into())
1587 .collect(),
1588 references.updated_at,
1589 )?;
1590 }
1591 Op::DropObjects(drop_object_infos) => {
1592 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1594
1595 tx.drop_comments(&delta.comments)?;
1597
1598 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1600 delta
1601 .items
1602 .iter()
1603 .map(|id| id)
1604 .partition(|id| !state.get_entry(*id).item().is_temporary());
1605 tx.remove_items(&durable_items_to_drop)?;
1606 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1607 let entry = state.get_entry(&id);
1608 (entry.clone().into(), StateDiff::Retraction)
1609 }));
1610
1611 for item_id in delta.items {
1612 let entry = state.get_entry(&item_id);
1613
1614 if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1618 {
1619 storage_collections_to_drop.extend(entry.global_ids());
1620 }
1621
1622 if state.source_references.contains_key(&item_id) {
1623 tx.remove_source_references(item_id)?;
1624 }
1625
1626 if Self::should_audit_log_item(entry.item()) {
1627 CatalogState::add_to_audit_log(
1628 &state.system_configuration,
1629 oracle_write_ts,
1630 session,
1631 tx,
1632 audit_events,
1633 EventType::Drop,
1634 catalog_type_to_audit_object_type(entry.item().typ()),
1635 EventDetails::IdFullNameV1(IdFullNameV1 {
1636 id: item_id.to_string(),
1637 name: Self::full_name_detail(&state.resolve_full_name(
1638 entry.name(),
1639 session.map(|session| session.conn_id()),
1640 )),
1641 }),
1642 )?;
1643 }
1644 info!(
1645 "drop {} {} ({})",
1646 entry.item_type(),
1647 state.resolve_full_name(entry.name(), entry.conn_id()),
1648 item_id
1649 );
1650 }
1651
1652 let schemas = delta
1654 .schemas
1655 .iter()
1656 .map(|(schema_spec, database_spec)| {
1657 (SchemaId::from(schema_spec), *database_spec)
1658 })
1659 .collect();
1660 tx.remove_schemas(&schemas)?;
1661
1662 for (schema_spec, database_spec) in delta.schemas {
1663 let schema = state.get_schema(
1664 &database_spec,
1665 &schema_spec,
1666 session
1667 .map(|session| session.conn_id())
1668 .unwrap_or(&SYSTEM_CONN_ID),
1669 );
1670
1671 let schema_id = SchemaId::from(schema_spec);
1672 let database_id = match database_spec {
1673 ResolvedDatabaseSpecifier::Ambient => None,
1674 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1675 };
1676
1677 CatalogState::add_to_audit_log(
1678 &state.system_configuration,
1679 oracle_write_ts,
1680 session,
1681 tx,
1682 audit_events,
1683 EventType::Drop,
1684 ObjectType::Schema,
1685 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1686 id: schema_id.to_string(),
1687 name: schema.name.schema.to_string(),
1688 database_name: database_id
1689 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1690 }),
1691 )?;
1692 }
1693
1694 tx.remove_databases(&delta.databases)?;
1696
1697 for database_id in delta.databases {
1698 let database = state.get_database(&database_id).clone();
1699
1700 CatalogState::add_to_audit_log(
1701 &state.system_configuration,
1702 oracle_write_ts,
1703 session,
1704 tx,
1705 audit_events,
1706 EventType::Drop,
1707 ObjectType::Database,
1708 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1709 id: database_id.to_string(),
1710 name: database.name.clone(),
1711 }),
1712 )?;
1713 }
1714
1715 tx.remove_user_roles(&delta.roles)?;
1717
1718 for role_id in delta.roles {
1719 let role = state
1720 .roles_by_id
1721 .get(&role_id)
1722 .expect("catalog out of sync");
1723
1724 CatalogState::add_to_audit_log(
1725 &state.system_configuration,
1726 oracle_write_ts,
1727 session,
1728 tx,
1729 audit_events,
1730 EventType::Drop,
1731 ObjectType::Role,
1732 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1733 id: role.id.to_string(),
1734 name: role.name.clone(),
1735 }),
1736 )?;
1737 info!("drop role {}", role.name());
1738 }
1739
1740 tx.remove_network_policies(&delta.network_policies)?;
1742
1743 for network_policy_id in delta.network_policies {
1744 let policy = state
1745 .network_policies_by_id
1746 .get(&network_policy_id)
1747 .expect("catalog out of sync");
1748
1749 CatalogState::add_to_audit_log(
1750 &state.system_configuration,
1751 oracle_write_ts,
1752 session,
1753 tx,
1754 audit_events,
1755 EventType::Drop,
1756 ObjectType::NetworkPolicy,
1757 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1758 id: policy.id.to_string(),
1759 name: policy.name.clone(),
1760 }),
1761 )?;
1762 info!("drop network policy {}", policy.name.clone());
1763 }
1764
1765 let replicas = delta.replicas.keys().copied().collect();
1767 tx.remove_cluster_replicas(&replicas)?;
1768
1769 for (replica_id, (cluster_id, reason)) in delta.replicas {
1770 let cluster = state.get_cluster(cluster_id);
1771 let replica = cluster.replica(replica_id).expect("Must exist");
1772
1773 let (reason, scheduling_policies) = reason.into_audit_log();
1774 let details =
1775 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1776 cluster_id: cluster_id.to_string(),
1777 cluster_name: cluster.name.clone(),
1778 replica_id: Some(replica_id.to_string()),
1779 replica_name: replica.name.clone(),
1780 reason,
1781 scheduling_policies,
1782 });
1783 CatalogState::add_to_audit_log(
1784 &state.system_configuration,
1785 oracle_write_ts,
1786 session,
1787 tx,
1788 audit_events,
1789 EventType::Drop,
1790 ObjectType::ClusterReplica,
1791 details,
1792 )?;
1793 }
1794
1795 tx.remove_clusters(&delta.clusters)?;
1797
1798 for cluster_id in delta.clusters {
1799 let cluster = state.get_cluster(cluster_id);
1800
1801 CatalogState::add_to_audit_log(
1802 &state.system_configuration,
1803 oracle_write_ts,
1804 session,
1805 tx,
1806 audit_events,
1807 EventType::Drop,
1808 ObjectType::Cluster,
1809 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1810 id: cluster.id.to_string(),
1811 name: cluster.name.clone(),
1812 }),
1813 )?;
1814 }
1815 }
1816 Op::GrantRole {
1817 role_id,
1818 member_id,
1819 grantor_id,
1820 } => {
1821 state.ensure_not_reserved_role(&member_id)?;
1822 state.ensure_grantable_role(&role_id)?;
1823 if state.collect_role_membership(&role_id).contains(&member_id) {
1824 let group_role = state.get_role(&role_id);
1825 let member_role = state.get_role(&member_id);
1826 return Err(AdapterError::Catalog(Error::new(
1827 ErrorKind::CircularRoleMembership {
1828 role_name: group_role.name().to_string(),
1829 member_name: member_role.name().to_string(),
1830 },
1831 )));
1832 }
1833 let mut member_role = state.get_role(&member_id).clone();
1834 member_role.membership.map.insert(role_id, grantor_id);
1835 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1836
1837 CatalogState::add_to_audit_log(
1838 &state.system_configuration,
1839 oracle_write_ts,
1840 session,
1841 tx,
1842 audit_events,
1843 EventType::Grant,
1844 ObjectType::Role,
1845 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1846 role_id: role_id.to_string(),
1847 member_id: member_id.to_string(),
1848 grantor_id: grantor_id.to_string(),
1849 executed_by: session
1850 .map(|session| session.authenticated_role_id())
1851 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1852 .to_string(),
1853 }),
1854 )?;
1855 }
1856 Op::RevokeRole {
1857 role_id,
1858 member_id,
1859 grantor_id,
1860 } => {
1861 state.ensure_not_reserved_role(&member_id)?;
1862 state.ensure_grantable_role(&role_id)?;
1863 let mut member_role = state.get_role(&member_id).clone();
1864 member_role.membership.map.remove(&role_id);
1865 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1866
1867 CatalogState::add_to_audit_log(
1868 &state.system_configuration,
1869 oracle_write_ts,
1870 session,
1871 tx,
1872 audit_events,
1873 EventType::Revoke,
1874 ObjectType::Role,
1875 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1876 role_id: role_id.to_string(),
1877 member_id: member_id.to_string(),
1878 grantor_id: grantor_id.to_string(),
1879 executed_by: session
1880 .map(|session| session.authenticated_role_id())
1881 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1882 .to_string(),
1883 }),
1884 )?;
1885 }
1886 Op::UpdatePrivilege {
1887 target_id,
1888 privilege,
1889 variant,
1890 } => {
1891 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1892 UpdatePrivilegeVariant::Grant => {
1893 privileges.grant(privilege);
1894 }
1895 UpdatePrivilegeVariant::Revoke => {
1896 privileges.revoke(&privilege);
1897 }
1898 };
1899 match &target_id {
1900 SystemObjectId::Object(object_id) => match object_id {
1901 ObjectId::Cluster(id) => {
1902 let mut cluster = state.get_cluster(*id).clone();
1903 update_privilege_fn(&mut cluster.privileges);
1904 tx.update_cluster(*id, cluster.into())?;
1905 }
1906 ObjectId::Database(id) => {
1907 let mut database = state.get_database(id).clone();
1908 update_privilege_fn(&mut database.privileges);
1909 tx.update_database(*id, database.into())?;
1910 }
1911 ObjectId::NetworkPolicy(id) => {
1912 let mut policy = state.get_network_policy(id).clone();
1913 update_privilege_fn(&mut policy.privileges);
1914 tx.update_network_policy(*id, policy.into())?;
1915 }
1916 ObjectId::Schema((database_spec, schema_spec)) => {
1917 let schema_id = schema_spec.clone().into();
1918 let mut schema = state
1919 .get_schema(
1920 database_spec,
1921 schema_spec,
1922 session
1923 .map(|session| session.conn_id())
1924 .unwrap_or(&SYSTEM_CONN_ID),
1925 )
1926 .clone();
1927 update_privilege_fn(&mut schema.privileges);
1928 tx.update_schema(schema_id, schema.into())?;
1929 }
1930 ObjectId::Item(id) => {
1931 let entry = state.get_entry(id);
1932 let mut new_entry = entry.clone();
1933 update_privilege_fn(&mut new_entry.privileges);
1934 if !new_entry.item().is_temporary() {
1935 tx.update_item(*id, new_entry.into())?;
1936 } else {
1937 temporary_item_updates
1938 .push((entry.clone().into(), StateDiff::Retraction));
1939 temporary_item_updates
1940 .push((new_entry.into(), StateDiff::Addition));
1941 }
1942 }
1943 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1944 },
1945 SystemObjectId::System => {
1946 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
1947 update_privilege_fn(&mut system_privileges);
1948 let new_privilege =
1949 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1950 tx.set_system_privilege(
1951 privilege.grantee,
1952 privilege.grantor,
1953 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1954 )?;
1955 }
1956 }
1957 let object_type = state.get_system_object_type(&target_id);
1958 let object_id_str = match &target_id {
1959 SystemObjectId::System => "SYSTEM".to_string(),
1960 SystemObjectId::Object(id) => id.to_string(),
1961 };
1962 CatalogState::add_to_audit_log(
1963 &state.system_configuration,
1964 oracle_write_ts,
1965 session,
1966 tx,
1967 audit_events,
1968 variant.into(),
1969 system_object_type_to_audit_object_type(&object_type),
1970 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1971 object_id: object_id_str,
1972 grantee_id: privilege.grantee.to_string(),
1973 grantor_id: privilege.grantor.to_string(),
1974 privileges: privilege.acl_mode.to_string(),
1975 }),
1976 )?;
1977 }
1978 Op::UpdateDefaultPrivilege {
1979 privilege_object,
1980 privilege_acl_item,
1981 variant,
1982 } => {
1983 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
1984 match variant {
1985 UpdatePrivilegeVariant::Grant => default_privileges
1986 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1987 UpdatePrivilegeVariant::Revoke => {
1988 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1989 }
1990 }
1991 let new_acl_mode = default_privileges
1992 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1993 tx.set_default_privilege(
1994 privilege_object.role_id,
1995 privilege_object.database_id,
1996 privilege_object.schema_id,
1997 privilege_object.object_type,
1998 privilege_acl_item.grantee,
1999 new_acl_mode.cloned(),
2000 )?;
2001 CatalogState::add_to_audit_log(
2002 &state.system_configuration,
2003 oracle_write_ts,
2004 session,
2005 tx,
2006 audit_events,
2007 variant.into(),
2008 object_type_to_audit_object_type(privilege_object.object_type),
2009 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2010 role_id: privilege_object.role_id.to_string(),
2011 database_id: privilege_object.database_id.map(|id| id.to_string()),
2012 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2013 grantee_id: privilege_acl_item.grantee.to_string(),
2014 privileges: privilege_acl_item.acl_mode.to_string(),
2015 }),
2016 )?;
2017 }
2018 Op::RenameCluster {
2019 id,
2020 name,
2021 to_name,
2022 check_reserved_names,
2023 } => {
2024 if id.is_system() {
2025 return Err(AdapterError::Catalog(Error::new(
2026 ErrorKind::ReadOnlyCluster(name.clone()),
2027 )));
2028 }
2029 if check_reserved_names && is_reserved_name(&to_name) {
2030 return Err(AdapterError::Catalog(Error::new(
2031 ErrorKind::ReservedClusterName(to_name),
2032 )));
2033 }
2034 tx.rename_cluster(id, &name, &to_name)?;
2035 CatalogState::add_to_audit_log(
2036 &state.system_configuration,
2037 oracle_write_ts,
2038 session,
2039 tx,
2040 audit_events,
2041 EventType::Alter,
2042 ObjectType::Cluster,
2043 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2044 id: id.to_string(),
2045 old_name: name.clone(),
2046 new_name: to_name.clone(),
2047 }),
2048 )?;
2049 info!("rename cluster {name} to {to_name}");
2050 }
2051 Op::RenameClusterReplica {
2052 cluster_id,
2053 replica_id,
2054 name,
2055 to_name,
2056 } => {
2057 if is_reserved_name(&to_name) {
2058 return Err(AdapterError::Catalog(Error::new(
2059 ErrorKind::ReservedReplicaName(to_name),
2060 )));
2061 }
2062 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2063 CatalogState::add_to_audit_log(
2064 &state.system_configuration,
2065 oracle_write_ts,
2066 session,
2067 tx,
2068 audit_events,
2069 EventType::Alter,
2070 ObjectType::ClusterReplica,
2071 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2072 cluster_id: cluster_id.to_string(),
2073 replica_id: replica_id.to_string(),
2074 old_name: name.replica.as_str().to_string(),
2075 new_name: to_name.clone(),
2076 }),
2077 )?;
2078 info!("rename cluster replica {name} to {to_name}");
2079 }
2080 Op::RenameItem {
2081 id,
2082 to_name,
2083 current_full_name,
2084 } => {
2085 let mut updates = Vec::new();
2086
2087 let entry = state.get_entry(&id);
2088 if let CatalogItem::Type(_) = entry.item() {
2089 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2090 current_full_name.to_string(),
2091 ))));
2092 }
2093
2094 if entry.id().is_system() {
2095 let name = state
2096 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2097 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2098 name.to_string(),
2099 ))));
2100 }
2101
2102 let mut to_full_name = current_full_name.clone();
2103 to_full_name.item.clone_from(&to_name);
2104
2105 let mut to_qualified_name = entry.name().clone();
2106 to_qualified_name.item.clone_from(&to_name);
2107
2108 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2109 id: id.to_string(),
2110 old_name: Self::full_name_detail(¤t_full_name),
2111 new_name: Self::full_name_detail(&to_full_name),
2112 });
2113 if Self::should_audit_log_item(entry.item()) {
2114 CatalogState::add_to_audit_log(
2115 &state.system_configuration,
2116 oracle_write_ts,
2117 session,
2118 tx,
2119 audit_events,
2120 EventType::Alter,
2121 catalog_type_to_audit_object_type(entry.item().typ()),
2122 details,
2123 )?;
2124 }
2125
2126 let mut new_entry = entry.clone();
2128 new_entry.name.item.clone_from(&to_name);
2129 new_entry.item = entry
2130 .item()
2131 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2132 .map_err(|e| {
2133 Error::new(ErrorKind::from(AmbiguousRename {
2134 depender: state
2135 .resolve_full_name(entry.name(), entry.conn_id())
2136 .to_string(),
2137 dependee: state
2138 .resolve_full_name(entry.name(), entry.conn_id())
2139 .to_string(),
2140 message: e,
2141 }))
2142 })?;
2143
2144 for id in entry.referenced_by() {
2145 let dependent_item = state.get_entry(id);
2146 let mut to_entry = dependent_item.clone();
2147 to_entry.item = dependent_item
2148 .item()
2149 .rename_item_refs(
2150 current_full_name.clone(),
2151 to_full_name.item.clone(),
2152 false,
2153 )
2154 .map_err(|e| {
2155 Error::new(ErrorKind::from(AmbiguousRename {
2156 depender: state
2157 .resolve_full_name(
2158 dependent_item.name(),
2159 dependent_item.conn_id(),
2160 )
2161 .to_string(),
2162 dependee: state
2163 .resolve_full_name(entry.name(), entry.conn_id())
2164 .to_string(),
2165 message: e,
2166 }))
2167 })?;
2168
2169 if !to_entry.item().is_temporary() {
2170 tx.update_item(*id, to_entry.into())?;
2171 } else {
2172 temporary_item_updates
2173 .push((dependent_item.clone().into(), StateDiff::Retraction));
2174 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2175 }
2176 updates.push(*id);
2177 }
2178 if !new_entry.item().is_temporary() {
2179 tx.update_item(id, new_entry.into())?;
2180 } else {
2181 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2182 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2183 }
2184
2185 updates.push(id);
2186 for id in updates {
2187 Self::log_update(state, &id);
2188 }
2189 }
2190 Op::RenameSchema {
2191 database_spec,
2192 schema_spec,
2193 new_name,
2194 check_reserved_names,
2195 } => {
2196 if check_reserved_names && is_reserved_name(&new_name) {
2197 return Err(AdapterError::Catalog(Error::new(
2198 ErrorKind::ReservedSchemaName(new_name),
2199 )));
2200 }
2201
2202 let conn_id = session
2203 .map(|session| session.conn_id())
2204 .unwrap_or(&SYSTEM_CONN_ID);
2205
2206 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2207 let cur_name = schema.name().schema.clone();
2208
2209 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2210 return Err(AdapterError::Catalog(Error::new(
2211 ErrorKind::AmbientSchemaRename(cur_name),
2212 )));
2213 };
2214 let database = state.get_database(&database_id);
2215 let database_name = &database.name;
2216
2217 let mut updates = Vec::new();
2218 let mut items_to_update = BTreeMap::new();
2219
2220 let mut update_item = |id| {
2221 if items_to_update.contains_key(id) {
2222 return Ok(());
2223 }
2224
2225 let entry = state.get_entry(id);
2226
2227 let mut new_entry = entry.clone();
2229 new_entry.item = entry
2230 .item
2231 .rename_schema_refs(database_name, &cur_name, &new_name)
2232 .map_err(|(s, _i)| {
2233 Error::new(ErrorKind::from(AmbiguousRename {
2234 depender: state
2235 .resolve_full_name(entry.name(), entry.conn_id())
2236 .to_string(),
2237 dependee: format!("{database_name}.{cur_name}"),
2238 message: format!("ambiguous reference to schema named {s}"),
2239 }))
2240 })?;
2241
2242 if !new_entry.item().is_temporary() {
2244 items_to_update.insert(*id, new_entry.into());
2245 } else {
2246 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2247 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2248 }
2249 updates.push(id);
2250
2251 Ok::<_, AdapterError>(())
2252 };
2253
2254 for (_name, item_id) in &schema.items {
2256 update_item(item_id)?;
2258
2259 for id in state.get_entry(item_id).referenced_by() {
2261 update_item(id)?;
2262 }
2263 }
2264 tx.update_items(items_to_update)?;
2267
2268 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2270 let schema_name = schema.name().schema.clone();
2271 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2272 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2273 )));
2274 };
2275
2276 let database_name = database_spec
2278 .id()
2279 .map(|id| state.get_database(&id).name.clone());
2280 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2281 id: schema_id.to_string(),
2282 old_name: schema.name().schema.clone(),
2283 new_name: new_name.clone(),
2284 database_name,
2285 });
2286 CatalogState::add_to_audit_log(
2287 &state.system_configuration,
2288 oracle_write_ts,
2289 session,
2290 tx,
2291 audit_events,
2292 EventType::Alter,
2293 mz_audit_log::ObjectType::Schema,
2294 details,
2295 )?;
2296
2297 let mut new_schema = schema.clone();
2299 new_schema.name.schema.clone_from(&new_name);
2300 tx.update_schema(schema_id, new_schema.into())?;
2301
2302 for id in updates {
2303 Self::log_update(state, id);
2304 }
2305 }
2306 Op::UpdateOwner { id, new_owner } => {
2307 let conn_id = session
2308 .map(|session| session.conn_id())
2309 .unwrap_or(&SYSTEM_CONN_ID);
2310 let old_owner = state
2311 .get_owner_id(&id, conn_id)
2312 .expect("cannot update the owner of an object without an owner");
2313 match &id {
2314 ObjectId::Cluster(id) => {
2315 let mut cluster = state.get_cluster(*id).clone();
2316 if id.is_system() {
2317 return Err(AdapterError::Catalog(Error::new(
2318 ErrorKind::ReadOnlyCluster(cluster.name),
2319 )));
2320 }
2321 Self::update_privilege_owners(
2322 &mut cluster.privileges,
2323 cluster.owner_id,
2324 new_owner,
2325 );
2326 cluster.owner_id = new_owner;
2327 tx.update_cluster(*id, cluster.into())?;
2328 }
2329 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2330 let cluster = state.get_cluster(*cluster_id);
2331 let mut replica = cluster
2332 .replica(*replica_id)
2333 .expect("catalog out of sync")
2334 .clone();
2335 if replica_id.is_system() {
2336 return Err(AdapterError::Catalog(Error::new(
2337 ErrorKind::ReadOnlyClusterReplica(replica.name),
2338 )));
2339 }
2340 replica.owner_id = new_owner;
2341 tx.update_cluster_replica(*replica_id, replica.into())?;
2342 }
2343 ObjectId::Database(id) => {
2344 let mut database = state.get_database(id).clone();
2345 if id.is_system() {
2346 return Err(AdapterError::Catalog(Error::new(
2347 ErrorKind::ReadOnlyDatabase(database.name),
2348 )));
2349 }
2350 Self::update_privilege_owners(
2351 &mut database.privileges,
2352 database.owner_id,
2353 new_owner,
2354 );
2355 database.owner_id = new_owner;
2356 tx.update_database(*id, database.clone().into())?;
2357 }
2358 ObjectId::Schema((database_spec, schema_spec)) => {
2359 let schema_id: SchemaId = schema_spec.clone().into();
2360 let mut schema = state
2361 .get_schema(database_spec, schema_spec, conn_id)
2362 .clone();
2363 if schema_id.is_system() {
2364 let name = schema.name();
2365 let full_name = state.resolve_full_schema_name(name);
2366 return Err(AdapterError::Catalog(Error::new(
2367 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2368 )));
2369 }
2370 Self::update_privilege_owners(
2371 &mut schema.privileges,
2372 schema.owner_id,
2373 new_owner,
2374 );
2375 schema.owner_id = new_owner;
2376 tx.update_schema(schema_id, schema.into())?;
2377 }
2378 ObjectId::Item(id) => {
2379 let entry = state.get_entry(id);
2380 let mut new_entry = entry.clone();
2381 if id.is_system() {
2382 let full_name = state.resolve_full_name(
2383 new_entry.name(),
2384 session.map(|session| session.conn_id()),
2385 );
2386 return Err(AdapterError::Catalog(Error::new(
2387 ErrorKind::ReadOnlyItem(full_name.to_string()),
2388 )));
2389 }
2390 Self::update_privilege_owners(
2391 &mut new_entry.privileges,
2392 new_entry.owner_id,
2393 new_owner,
2394 );
2395 new_entry.owner_id = new_owner;
2396 if !new_entry.item().is_temporary() {
2397 tx.update_item(*id, new_entry.into())?;
2398 } else {
2399 temporary_item_updates
2400 .push((entry.clone().into(), StateDiff::Retraction));
2401 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2402 }
2403 }
2404 ObjectId::NetworkPolicy(id) => {
2405 let mut policy = state.get_network_policy(id).clone();
2406 if id.is_system() {
2407 return Err(AdapterError::Catalog(Error::new(
2408 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2409 )));
2410 }
2411 Self::update_privilege_owners(
2412 &mut policy.privileges,
2413 policy.owner_id,
2414 new_owner,
2415 );
2416 policy.owner_id = new_owner;
2417 tx.update_network_policy(*id, policy.into())?;
2418 }
2419 ObjectId::Role(_) => unreachable!("roles have no owner"),
2420 }
2421 let object_type = state.get_object_type(&id);
2422 CatalogState::add_to_audit_log(
2423 &state.system_configuration,
2424 oracle_write_ts,
2425 session,
2426 tx,
2427 audit_events,
2428 EventType::Alter,
2429 object_type_to_audit_object_type(object_type),
2430 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2431 object_id: id.to_string(),
2432 old_owner_id: old_owner.to_string(),
2433 new_owner_id: new_owner.to_string(),
2434 }),
2435 )?;
2436 }
2437 Op::UpdateClusterConfig { id, name, config } => {
2438 let mut cluster = state.get_cluster(id).clone();
2439 cluster.config = config;
2440 tx.update_cluster(id, cluster.into())?;
2441 info!("update cluster {}", name);
2442
2443 CatalogState::add_to_audit_log(
2444 &state.system_configuration,
2445 oracle_write_ts,
2446 session,
2447 tx,
2448 audit_events,
2449 EventType::Alter,
2450 ObjectType::Cluster,
2451 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2452 id: id.to_string(),
2453 name,
2454 }),
2455 )?;
2456 }
2457 Op::UpdateClusterReplicaConfig {
2458 replica_id,
2459 cluster_id,
2460 config,
2461 } => {
2462 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2463 info!("update replica {}", replica.name);
2464 tx.update_cluster_replica(
2465 replica_id,
2466 mz_catalog::durable::ClusterReplica {
2467 cluster_id,
2468 replica_id,
2469 name: replica.name.clone(),
2470 config: config.clone().into(),
2471 owner_id: replica.owner_id,
2472 },
2473 )?;
2474 }
2475 Op::UpdateItem { id, name, to_item } => {
2476 let mut entry = state.get_entry(&id).clone();
2477 entry.name = name.clone();
2478 entry.item = to_item.clone();
2479 tx.update_item(id, entry.into())?;
2480
2481 if Self::should_audit_log_item(&to_item) {
2482 let mut full_name = Self::full_name_detail(
2483 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2484 );
2485 full_name.item = name.item;
2486
2487 CatalogState::add_to_audit_log(
2488 &state.system_configuration,
2489 oracle_write_ts,
2490 session,
2491 tx,
2492 audit_events,
2493 EventType::Alter,
2494 catalog_type_to_audit_object_type(to_item.typ()),
2495 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2496 id: id.to_string(),
2497 name: full_name,
2498 }),
2499 )?;
2500 }
2501
2502 Self::log_update(state, &id);
2503 }
2504 Op::UpdateSystemConfiguration { name, value } => {
2505 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2506 tx.upsert_system_config(&name, parsed_value.clone())?;
2507 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2512 let with_0dt_deployment_max_wait =
2513 Duration::parse(VarInput::Flat(&parsed_value))
2514 .expect("parsing succeeded above");
2515 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2516 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2517 let with_0dt_deployment_ddl_check_interval =
2518 Duration::parse(VarInput::Flat(&parsed_value))
2519 .expect("parsing succeeded above");
2520 tx.set_0dt_deployment_ddl_check_interval(
2521 with_0dt_deployment_ddl_check_interval,
2522 )?;
2523 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2524 let panic_after_timeout =
2525 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2526 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2527 }
2528
2529 CatalogState::add_to_audit_log(
2530 &state.system_configuration,
2531 oracle_write_ts,
2532 session,
2533 tx,
2534 audit_events,
2535 EventType::Alter,
2536 ObjectType::System,
2537 EventDetails::SetV1(mz_audit_log::SetV1 {
2538 name,
2539 value: Some(value.borrow().to_vec().join(", ")),
2540 }),
2541 )?;
2542 }
2543 Op::ResetSystemConfiguration { name } => {
2544 tx.remove_system_config(&name);
2545 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2550 tx.reset_0dt_deployment_max_wait()?;
2551 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2552 tx.reset_0dt_deployment_ddl_check_interval()?;
2553 }
2554
2555 CatalogState::add_to_audit_log(
2556 &state.system_configuration,
2557 oracle_write_ts,
2558 session,
2559 tx,
2560 audit_events,
2561 EventType::Alter,
2562 ObjectType::System,
2563 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2564 )?;
2565 }
2566 Op::ResetAllSystemConfiguration => {
2567 tx.clear_system_configs();
2568 tx.reset_0dt_deployment_max_wait()?;
2569 tx.reset_0dt_deployment_ddl_check_interval()?;
2570
2571 CatalogState::add_to_audit_log(
2572 &state.system_configuration,
2573 oracle_write_ts,
2574 session,
2575 tx,
2576 audit_events,
2577 EventType::Alter,
2578 ObjectType::System,
2579 EventDetails::ResetAllV1,
2580 )?;
2581 }
2582 Op::WeirdStorageUsageUpdates {
2583 object_id,
2584 size_bytes,
2585 collection_timestamp,
2586 } => {
2587 let id = tx.allocate_storage_usage_ids()?;
2588 let metric =
2589 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2590 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2591 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2592 weird_builtin_table_update = Some(builtin_table_update);
2593 }
2594 };
2595 Ok((weird_builtin_table_update, temporary_item_updates))
2596 }
2597
2598 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2599 let entry = state.get_entry(id);
2600 info!(
2601 "update {} {} ({})",
2602 entry.item_type(),
2603 state.resolve_full_name(entry.name(), entry.conn_id()),
2604 id
2605 );
2606 }
2607
2608 fn update_privilege_owners(
2612 privileges: &mut PrivilegeMap,
2613 old_owner: RoleId,
2614 new_owner: RoleId,
2615 ) {
2616 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2618
2619 let mut new_present = false;
2620 for privilege in flat_privileges.iter_mut() {
2621 if privilege.grantor == old_owner {
2624 privilege.grantor = new_owner;
2625 } else if privilege.grantor == new_owner {
2626 new_present = true;
2627 }
2628 if privilege.grantee == old_owner {
2630 privilege.grantee = new_owner;
2631 } else if privilege.grantee == new_owner {
2632 new_present = true;
2633 }
2634 }
2635
2636 if new_present {
2640 let privilege_map: BTreeMap<_, Vec<_>> =
2642 flat_privileges
2643 .into_iter()
2644 .fold(BTreeMap::new(), |mut accum, privilege| {
2645 accum
2646 .entry((privilege.grantee, privilege.grantor))
2647 .or_default()
2648 .push(privilege);
2649 accum
2650 });
2651
2652 flat_privileges = privilege_map
2654 .into_iter()
2655 .map(|((grantee, grantor), values)|
2656 values.into_iter().fold(
2658 MzAclItem::empty(grantee, grantor),
2659 |mut accum, mz_aclitem| {
2660 accum.acl_mode =
2661 accum.acl_mode.union(mz_aclitem.acl_mode);
2662 accum
2663 },
2664 ))
2665 .collect();
2666 }
2667
2668 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2669 }
2670}
2671
2672fn tx_replace_item(
2681 tx: &mut Transaction<'_>,
2682 state: &CatalogState,
2683 id: CatalogItemId,
2684 new_entry: CatalogEntry,
2685) -> Result<(), AdapterError> {
2686 let new_id = new_entry.id;
2687
2688 for use_id in new_entry.referenced_by() {
2690 if tx.get_item(use_id).is_none() {
2692 continue;
2693 }
2694
2695 let mut dependent = state.get_entry(use_id).clone();
2696 dependent.item = dependent.item.replace_item_refs(id, new_id);
2697 tx.update_item(*use_id, dependent.into())?;
2698 }
2699
2700 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2702 let new_comment_id = new_entry.comment_object_id();
2703 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2704 tx.drop_comments(&[old_comment_id].into())?;
2705 for (sub, comment) in comments {
2706 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2707 }
2708 }
2709
2710 let mz_catalog::durable::Item {
2711 id: _,
2712 oid,
2713 global_id,
2714 schema_id,
2715 name,
2716 create_sql,
2717 owner_id,
2718 privileges,
2719 extra_versions,
2720 } = new_entry.into();
2721
2722 tx.remove_item(id)?;
2723 tx.insert_item(
2724 new_id,
2725 oid,
2726 global_id,
2727 schema_id,
2728 &name,
2729 create_sql,
2730 owner_id,
2731 privileges,
2732 extra_versions,
2733 )?;
2734
2735 Ok(())
2736}
2737
2738fn apply_replacement_audit_events(
2740 state: &CatalogState,
2741 target: &CatalogEntry,
2742 replacement: &CatalogEntry,
2743) -> Vec<(EventType, EventDetails)> {
2744 let mut events = Vec::new();
2745
2746 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2747 let target_id_name = IdFullNameV1 {
2748 id: target.id().to_string(),
2749 name: Catalog::full_name_detail(&target_name),
2750 };
2751 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2752 let replacement_id_name = IdFullNameV1 {
2753 id: replacement.id().to_string(),
2754 name: Catalog::full_name_detail(&replacement_name),
2755 };
2756
2757 if Catalog::should_audit_log_item(&replacement.item) {
2758 events.push((
2759 EventType::Drop,
2760 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2761 ));
2762 }
2763
2764 if Catalog::should_audit_log_item(&target.item) {
2765 events.push((
2766 EventType::Alter,
2767 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2768 target: target_id_name.clone(),
2769 replacement: replacement_id_name,
2770 }),
2771 ));
2772
2773 if let Some(old_cluster_id) = target.cluster_id()
2774 && let Some(new_cluster_id) = replacement.cluster_id()
2775 && old_cluster_id != new_cluster_id
2776 {
2777 events.push((
2780 EventType::Alter,
2781 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2782 id: replacement.id().to_string(),
2783 name: target_id_name.name,
2784 old_cluster_id: old_cluster_id.to_string(),
2785 new_cluster_id: new_cluster_id.to_string(),
2786 }),
2787 ));
2788 }
2789 }
2790
2791 events
2792}
2793
2794#[derive(Debug, Default)]
2802pub(crate) struct ObjectsToDrop {
2803 pub comments: BTreeSet<CommentObjectId>,
2804 pub databases: BTreeSet<DatabaseId>,
2805 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2806 pub clusters: BTreeSet<ClusterId>,
2807 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2808 pub roles: BTreeSet<RoleId>,
2809 pub items: Vec<CatalogItemId>,
2810 pub network_policies: BTreeSet<NetworkPolicyId>,
2811}
2812
2813impl ObjectsToDrop {
2814 pub fn generate(
2815 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2816 state: &CatalogState,
2817 session: Option<&ConnMeta>,
2818 ) -> Result<Self, AdapterError> {
2819 let mut delta = ObjectsToDrop::default();
2820
2821 for drop_object_info in drop_object_infos {
2822 delta.add_item(drop_object_info, state, session)?;
2823 }
2824
2825 Ok(delta)
2826 }
2827
2828 fn add_item(
2829 &mut self,
2830 drop_object_info: DropObjectInfo,
2831 state: &CatalogState,
2832 session: Option<&ConnMeta>,
2833 ) -> Result<(), AdapterError> {
2834 self.comments
2835 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2836
2837 match drop_object_info {
2838 DropObjectInfo::Database(database_id) => {
2839 let database = &state.database_by_id[&database_id];
2840 if database_id.is_system() {
2841 return Err(AdapterError::Catalog(Error::new(
2842 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2843 )));
2844 }
2845
2846 self.databases.insert(database_id);
2847 }
2848 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2849 let schema = state.get_schema(
2850 &database_spec,
2851 &schema_spec,
2852 session
2853 .map(|session| session.conn_id())
2854 .unwrap_or(&SYSTEM_CONN_ID),
2855 );
2856 let schema_id: SchemaId = schema_spec.into();
2857 if schema_id.is_system() {
2858 let name = schema.name();
2859 let full_name = state.resolve_full_schema_name(name);
2860 return Err(AdapterError::Catalog(Error::new(
2861 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2862 )));
2863 }
2864
2865 self.schemas.insert(schema_spec, database_spec);
2866 }
2867 DropObjectInfo::Role(role_id) => {
2868 let name = state.get_role(&role_id).name().to_string();
2869 if role_id.is_system() || role_id.is_predefined() {
2870 return Err(AdapterError::Catalog(Error::new(
2871 ErrorKind::ReservedRoleName(name.clone()),
2872 )));
2873 }
2874 state.ensure_not_reserved_role(&role_id)?;
2875
2876 self.roles.insert(role_id);
2877 }
2878 DropObjectInfo::Cluster(cluster_id) => {
2879 let cluster = state.get_cluster(cluster_id);
2880 let name = &cluster.name;
2881 if cluster_id.is_system() {
2882 return Err(AdapterError::Catalog(Error::new(
2883 ErrorKind::ReadOnlyCluster(name.clone()),
2884 )));
2885 }
2886
2887 self.clusters.insert(cluster_id);
2888 }
2889 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2890 let cluster = state.get_cluster(cluster_id);
2891 let replica = cluster.replica(replica_id).expect("Must exist");
2892
2893 self.replicas
2894 .insert(replica.replica_id, (cluster.id, reason));
2895
2896 for (_id, entry) in state.get_entries() {
2900 if let CatalogItem::MaterializedView(mv) = entry.item() {
2901 if mv.target_replica == Some(replica_id)
2902 && !self.items.contains(&entry.id())
2903 {
2904 tracing::warn!(
2905 "implicitly dropping materialized view {} because target replica was dropped",
2906 entry.name().item,
2907 );
2908 self.items.push(entry.id());
2909 }
2910 }
2911 }
2912 }
2913 DropObjectInfo::Item(item_id) => {
2914 let entry = state.get_entry(&item_id);
2915 if item_id.is_system() {
2916 let name = entry.name();
2917 let full_name =
2918 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2919 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2920 full_name.to_string(),
2921 ))));
2922 }
2923
2924 self.items.push(item_id);
2925 }
2926 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2927 let policy = state.get_network_policy(&network_policy_id);
2928 let name = &policy.name;
2929 if network_policy_id.is_system() {
2930 return Err(AdapterError::Catalog(Error::new(
2931 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2932 )));
2933 }
2934
2935 self.network_policies.insert(network_policy_id);
2936 }
2937 }
2938
2939 Ok(())
2940 }
2941}
2942
2943#[cfg(test)]
2944mod tests {
2945 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2946 use mz_repr::role_id::RoleId;
2947
2948 use crate::catalog::Catalog;
2949
2950 #[mz_ore::test]
2951 fn test_update_privilege_owners() {
2952 let old_owner = RoleId::User(1);
2953 let new_owner = RoleId::User(2);
2954 let other_role = RoleId::User(3);
2955
2956 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2958 MzAclItem {
2959 grantee: other_role,
2960 grantor: old_owner,
2961 acl_mode: AclMode::UPDATE,
2962 },
2963 MzAclItem {
2964 grantee: other_role,
2965 grantor: new_owner,
2966 acl_mode: AclMode::SELECT,
2967 },
2968 ]);
2969 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2970 assert_eq!(1, privileges.all_values().count());
2971 assert_eq!(
2972 vec![MzAclItem {
2973 grantee: other_role,
2974 grantor: new_owner,
2975 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2976 }],
2977 privileges.all_values_owned().collect::<Vec<_>>()
2978 );
2979
2980 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2982 MzAclItem {
2983 grantee: old_owner,
2984 grantor: other_role,
2985 acl_mode: AclMode::UPDATE,
2986 },
2987 MzAclItem {
2988 grantee: new_owner,
2989 grantor: other_role,
2990 acl_mode: AclMode::SELECT,
2991 },
2992 ]);
2993 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2994 assert_eq!(1, privileges.all_values().count());
2995 assert_eq!(
2996 vec![MzAclItem {
2997 grantee: new_owner,
2998 grantor: other_role,
2999 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3000 }],
3001 privileges.all_values_owned().collect::<Vec<_>>()
3002 );
3003
3004 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3006 MzAclItem {
3007 grantee: old_owner,
3008 grantor: old_owner,
3009 acl_mode: AclMode::UPDATE,
3010 },
3011 MzAclItem {
3012 grantee: new_owner,
3013 grantor: new_owner,
3014 acl_mode: AclMode::SELECT,
3015 },
3016 ]);
3017 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3018 assert_eq!(1, privileges.all_values().count());
3019 assert_eq!(
3020 vec![MzAclItem {
3021 grantee: new_owner,
3022 grantor: new_owner,
3023 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3024 }],
3025 privileges.all_values_owned().collect::<Vec<_>>()
3026 );
3027 }
3028}