mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff,
34    StateUpdate, StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50    RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
69    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
70    is_reserved_role_name, object_type_to_audit_object_type,
71    system_object_type_to_audit_object_type,
72};
73use crate::coord::ConnMeta;
74use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
75use crate::coord::cluster_scheduling::SchedulingDecision;
76use crate::util::ResultExt;
77
78#[derive(Debug, Clone)]
79pub enum Op {
80    AlterRetainHistory {
81        id: CatalogItemId,
82        value: Option<Value>,
83        window: CompactionWindow,
84    },
85    AlterRole {
86        id: RoleId,
87        name: String,
88        attributes: RoleAttributesRaw,
89        nopassword: bool,
90        vars: RoleVars,
91    },
92    AlterNetworkPolicy {
93        id: NetworkPolicyId,
94        rules: Vec<NetworkPolicyRule>,
95        name: String,
96        owner_id: RoleId,
97    },
98    AlterAddColumn {
99        id: CatalogItemId,
100        new_global_id: GlobalId,
101        name: ColumnName,
102        typ: SqlColumnType,
103        sql: RawDataType,
104    },
105    AlterMaterializedViewApplyReplacement {
106        id: CatalogItemId,
107        replacement_id: CatalogItemId,
108    },
109    CreateDatabase {
110        name: String,
111        owner_id: RoleId,
112    },
113    CreateSchema {
114        database_id: ResolvedDatabaseSpecifier,
115        schema_name: String,
116        owner_id: RoleId,
117    },
118    CreateRole {
119        name: String,
120        attributes: RoleAttributesRaw,
121    },
122    CreateCluster {
123        id: ClusterId,
124        name: String,
125        introspection_sources: Vec<&'static BuiltinLog>,
126        owner_id: RoleId,
127        config: ClusterConfig,
128    },
129    CreateClusterReplica {
130        cluster_id: ClusterId,
131        name: String,
132        config: ReplicaConfig,
133        owner_id: RoleId,
134        reason: ReplicaCreateDropReason,
135    },
136    CreateItem {
137        id: CatalogItemId,
138        name: QualifiedItemName,
139        item: CatalogItem,
140        owner_id: RoleId,
141    },
142    CreateNetworkPolicy {
143        rules: Vec<NetworkPolicyRule>,
144        name: String,
145        owner_id: RoleId,
146    },
147    Comment {
148        object_id: CommentObjectId,
149        sub_component: Option<usize>,
150        comment: Option<String>,
151    },
152    DropObjects(Vec<DropObjectInfo>),
153    GrantRole {
154        role_id: RoleId,
155        member_id: RoleId,
156        grantor_id: RoleId,
157    },
158    RenameCluster {
159        id: ClusterId,
160        name: String,
161        to_name: String,
162        check_reserved_names: bool,
163    },
164    RenameClusterReplica {
165        cluster_id: ClusterId,
166        replica_id: ReplicaId,
167        name: QualifiedReplica,
168        to_name: String,
169    },
170    RenameItem {
171        id: CatalogItemId,
172        current_full_name: FullItemName,
173        to_name: String,
174    },
175    RenameSchema {
176        database_spec: ResolvedDatabaseSpecifier,
177        schema_spec: SchemaSpecifier,
178        new_name: String,
179        check_reserved_names: bool,
180    },
181    UpdateOwner {
182        id: ObjectId,
183        new_owner: RoleId,
184    },
185    UpdatePrivilege {
186        target_id: SystemObjectId,
187        privilege: MzAclItem,
188        variant: UpdatePrivilegeVariant,
189    },
190    UpdateDefaultPrivilege {
191        privilege_object: DefaultPrivilegeObject,
192        privilege_acl_item: DefaultPrivilegeAclItem,
193        variant: UpdatePrivilegeVariant,
194    },
195    RevokeRole {
196        role_id: RoleId,
197        member_id: RoleId,
198        grantor_id: RoleId,
199    },
200    UpdateClusterConfig {
201        id: ClusterId,
202        name: String,
203        config: ClusterConfig,
204    },
205    UpdateClusterReplicaConfig {
206        cluster_id: ClusterId,
207        replica_id: ReplicaId,
208        config: ReplicaConfig,
209    },
210    UpdateItem {
211        id: CatalogItemId,
212        name: QualifiedItemName,
213        to_item: CatalogItem,
214    },
215    UpdateSourceReferences {
216        source_id: CatalogItemId,
217        references: SourceReferences,
218    },
219    UpdateSystemConfiguration {
220        name: String,
221        value: OwnedVarInput,
222    },
223    ResetSystemConfiguration {
224        name: String,
225    },
226    ResetAllSystemConfiguration,
227    /// Performs updates to the storage usage table, which probably should be a builtin source.
228    ///
229    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
230    /// might not work. If a process crashes after collecting storage usage events
231    /// but before updating the builtin table, then another listening catalog
232    /// will never know to update the builtin table.
233    WeirdStorageUsageUpdates {
234        object_id: Option<String>,
235        size_bytes: u64,
236        collection_timestamp: EpochMillis,
237    },
238    /// Performs a dry run of the commit, but errors with
239    /// [`AdapterError::TransactionDryRun`].
240    ///
241    /// When using this value, it should be included only as the last element of
242    /// the transaction and should not be the only value in the transaction.
243    TransactionDryRun,
244}
245
246/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
247/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
248/// the `Op::DropObjects`.
249#[derive(Debug, Clone)]
250pub enum DropObjectInfo {
251    Cluster(ClusterId),
252    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
253    Database(DatabaseId),
254    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
255    Role(RoleId),
256    Item(CatalogItemId),
257    NetworkPolicy(NetworkPolicyId),
258}
259
260impl DropObjectInfo {
261    /// Creates a `DropObjectInfo` from an `ObjectId`.
262    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
263    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
264        match id {
265            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
266            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
267                cluster_id,
268                replica_id,
269                ReplicaCreateDropReason::Manual,
270            )),
271            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
272            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
273            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
274            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
275            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
276        }
277    }
278
279    /// Creates an `ObjectId` from a `DropObjectInfo`.
280    /// Loses the `ReplicaCreateDropReason` if there is one!
281    fn to_object_id(&self) -> ObjectId {
282        match &self {
283            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
284            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
285                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
286            }
287            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
288            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
289            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
290            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
291            DropObjectInfo::NetworkPolicy(network_policy_id) => {
292                ObjectId::NetworkPolicy(network_policy_id.clone())
293            }
294        }
295    }
296}
297
298/// The reason for creating or dropping a replica.
299#[derive(Debug, Clone)]
300pub enum ReplicaCreateDropReason {
301    /// The user initiated the replica create or drop, e.g., by
302    /// - creating/dropping a cluster,
303    /// - ALTERing various options on a managed cluster,
304    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
305    Manual,
306    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
307    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
308    ClusterScheduling(Vec<SchedulingDecision>),
309}
310
311impl ReplicaCreateDropReason {
312    pub fn into_audit_log(
313        self,
314    ) -> (
315        CreateOrDropClusterReplicaReasonV1,
316        Option<SchedulingDecisionsWithReasonsV2>,
317    ) {
318        let (reason, scheduling_policies) = match self {
319            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
320            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
321                CreateOrDropClusterReplicaReasonV1::Schedule,
322                Some(scheduling_decisions),
323            ),
324        };
325        (
326            reason,
327            scheduling_policies
328                .as_ref()
329                .map(SchedulingDecision::reasons_to_audit_log_reasons),
330        )
331    }
332}
333
334pub struct TransactionResult {
335    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
336    /// Parsed catalog updates from which we will derive catalog implications.
337    pub catalog_updates: Vec<ParsedStateUpdate>,
338    pub audit_events: Vec<VersionedEvent>,
339}
340
341impl Catalog {
342    fn should_audit_log_item(item: &CatalogItem) -> bool {
343        !item.is_temporary()
344    }
345
346    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
347    /// within a connection id.
348    fn temporary_ids(
349        &self,
350        ops: &[Op],
351        temporary_drops: BTreeSet<(&ConnectionId, String)>,
352    ) -> Result<BTreeSet<CatalogItemId>, Error> {
353        let mut creating = BTreeSet::new();
354        let mut temporary_ids = BTreeSet::new();
355        for op in ops.iter() {
356            if let Op::CreateItem {
357                id,
358                name,
359                item,
360                owner_id: _,
361            } = op
362            {
363                if let Some(conn_id) = item.conn_id() {
364                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
365                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
366                        || creating.contains(&(conn_id, &name.item))
367                    {
368                        return Err(
369                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
370                        );
371                    } else {
372                        creating.insert((conn_id, &name.item));
373                        temporary_ids.insert(id.clone());
374                    }
375                }
376            }
377        }
378        Ok(temporary_ids)
379    }
380
381    #[instrument(name = "catalog::transact")]
382    pub async fn transact(
383        &mut self,
384        // n.b. this is an option to prevent us from needing to build out a
385        // dummy impl of `StorageController` for tests.
386        storage_collections: Option<
387            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
388        >,
389        oracle_write_ts: mz_repr::Timestamp,
390        session: Option<&ConnMeta>,
391        ops: Vec<Op>,
392    ) -> Result<TransactionResult, AdapterError> {
393        trace!("transact: {:?}", ops);
394        fail::fail_point!("catalog_transact", |arg| {
395            Err(AdapterError::Unstructured(anyhow::anyhow!(
396                "failpoint: {arg:?}"
397            )))
398        });
399
400        let drop_ids: BTreeSet<CatalogItemId> = ops
401            .iter()
402            .filter_map(|op| match op {
403                Op::DropObjects(drop_object_infos) => {
404                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
405                    let item_ids = ids.filter_map(|id| match id {
406                        ObjectId::Item(id) => Some(id),
407                        _ => None,
408                    });
409                    Some(item_ids)
410                }
411                _ => None,
412            })
413            .flatten()
414            .collect();
415        let temporary_drops = drop_ids
416            .iter()
417            .filter_map(|id| {
418                let entry = self.get_entry(id);
419                match entry.item().conn_id() {
420                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
421                    None => None,
422                }
423            })
424            .collect();
425        let dropped_global_ids = drop_ids
426            .iter()
427            .flat_map(|item_id| self.get_global_ids(item_id))
428            .collect();
429
430        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
431        let mut builtin_table_updates = vec![];
432        let mut catalog_updates = vec![];
433        let mut audit_events = vec![];
434        let mut storage = self.storage().await;
435        let mut tx = storage
436            .transaction()
437            .await
438            .unwrap_or_terminate("starting catalog transaction");
439
440        let new_state = Self::transact_inner(
441            storage_collections,
442            oracle_write_ts,
443            session,
444            ops,
445            temporary_ids,
446            &mut builtin_table_updates,
447            &mut catalog_updates,
448            &mut audit_events,
449            &mut tx,
450            &self.state,
451        )
452        .await?;
453
454        // The user closure was successful, apply the updates. Terminate the
455        // process if this fails, because we have to restart envd due to
456        // indeterminate catalog state, which we only reconcile during catalog
457        // init.
458        tx.commit(oracle_write_ts)
459            .await
460            .unwrap_or_terminate("catalog storage transaction commit must succeed");
461
462        // Dropping here keeps the mutable borrow on self, preventing us accidentally
463        // mutating anything until after f is executed.
464        drop(storage);
465        if let Some(new_state) = new_state {
466            self.transient_revision += 1;
467            self.state = new_state;
468        }
469
470        // Drop in-memory planning metadata.
471        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
472        if self.state.system_config().enable_mz_notices() {
473            // Generate retractions for the Builtin tables.
474            self.state().pack_optimizer_notices(
475                &mut builtin_table_updates,
476                dropped_notices.iter(),
477                Diff::MINUS_ONE,
478            );
479        }
480
481        Ok(TransactionResult {
482            builtin_table_updates,
483            catalog_updates,
484            audit_events,
485        })
486    }
487
488    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
489    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
490    ///
491    /// # Panics
492    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
493    ///   final element.
494    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
495    #[instrument(name = "catalog::transact_inner")]
496    async fn transact_inner(
497        storage_collections: Option<
498            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
499        >,
500        oracle_write_ts: mz_repr::Timestamp,
501        session: Option<&ConnMeta>,
502        mut ops: Vec<Op>,
503        temporary_ids: BTreeSet<CatalogItemId>,
504        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
505        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
506        audit_events: &mut Vec<VersionedEvent>,
507        tx: &mut Transaction<'_>,
508        state: &CatalogState,
509    ) -> Result<Option<CatalogState>, AdapterError> {
510        // We come up with new catalog state, builtin state updates, and parsed
511        // catalog updates (for deriving catalog implications) in two phases:
512        //
513        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
514        //    one-by-one. This will give us the full list of updates to apply to
515        //    the catalog, which will allow us to apply it in one batch, which
516        //    in turn will allow the apply machinery to consolidate the updates.
517        // 2. We do one final apply call with all updates, which gives us the
518        //    final builtin table updates and parsed catalog updates.
519        //
520        // The reason is that the loop that is working off ops first does a
521        // transact_op to derive the state updates for that op, and then calls
522        // apply_updates on the catalog state. And successive ops might expect
523        // the catalog state to reflect the modified state _after_ applying
524        // previous ops.
525        //
526        // We want to, however, have one final apply_state that takes all the
527        // accumulated updates to derive the required controller updates and the
528        // builtin table updates.
529        //
530        // We won't win any DDL throughput benchmarks, but so far that's not
531        // what we're optimizing for and there would probably be other
532        // bottlenecks before we hit this one as a bottleneck.
533        //
534        // We could work around this by refactoring how the interplay of
535        // transact_op and apply_updates works, but that's a larger undertaking.
536        let mut preliminary_state = Cow::Borrowed(state);
537
538        // The final state that we will return, if modified.
539        let mut state = Cow::Borrowed(state);
540
541        let dry_run_ops = match ops.last() {
542            Some(Op::TransactionDryRun) => {
543                // Remove dry run marker.
544                ops.pop();
545                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
546                ops.clone()
547            }
548            Some(_) => vec![],
549            None => return Ok(None),
550        };
551
552        let mut storage_collections_to_create = BTreeSet::new();
553        let mut storage_collections_to_drop = BTreeSet::new();
554        let mut storage_collections_to_register = BTreeMap::new();
555
556        let mut updates = Vec::new();
557
558        for op in ops {
559            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
560                oracle_write_ts,
561                session,
562                op,
563                &temporary_ids,
564                audit_events,
565                tx,
566                &*preliminary_state,
567                &mut storage_collections_to_create,
568                &mut storage_collections_to_drop,
569                &mut storage_collections_to_register,
570            )
571            .await?;
572
573            // Certain builtin tables are not derived from the durable catalog state, so they need
574            // to be updated ad-hoc based on the current transaction. This is weird and will not
575            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
576            // this instance crashes after committing the durable catalog but before applying the
577            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
578            // world. Currently, this works fine and there are no correctness issues because
579            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
580            // state of all builtin tables to the correct values.
581            if let Some(builtin_table_update) = weird_builtin_table_update {
582                builtin_table_updates.push(builtin_table_update);
583            }
584
585            // Temporary items are not stored in the durable catalog, so they need to be handled
586            // separately for updating state and builtin tables.
587            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
588            // in a multi-subscriber catalog world.
589            let upper = tx.upper();
590            let temporary_item_updates =
591                temporary_item_updates
592                    .into_iter()
593                    .map(|(item, diff)| StateUpdate {
594                        kind: StateUpdateKind::TemporaryItem(item),
595                        ts: upper,
596                        diff,
597                    });
598
599            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
600            op_updates.extend(temporary_item_updates);
601            if !op_updates.is_empty() {
602                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
603                    .to_mut()
604                    .apply_updates(op_updates.clone(), &mut LocalExpressionCache::Closed)
605                    .await;
606            }
607            updates.append(&mut op_updates);
608        }
609
610        if !updates.is_empty() {
611            let (op_builtin_table_updates, op_catalog_updates) = state
612                .to_mut()
613                .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
614                .await;
615            let op_builtin_table_updates = state
616                .to_mut()
617                .resolve_builtin_table_updates(op_builtin_table_updates);
618            builtin_table_updates.extend(op_builtin_table_updates);
619            parsed_catalog_updates.extend(op_catalog_updates);
620        }
621
622        if dry_run_ops.is_empty() {
623            // `storage_collections` should only be `None` for tests.
624            if let Some(c) = storage_collections {
625                c.prepare_state(
626                    tx,
627                    storage_collections_to_create,
628                    storage_collections_to_drop,
629                    storage_collections_to_register,
630                )
631                .await?;
632            }
633
634            let updates = tx.get_and_commit_op_updates();
635            if !updates.is_empty() {
636                let (op_builtin_table_updates, op_catalog_updates) = state
637                    .to_mut()
638                    .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
639                    .await;
640                let op_builtin_table_updates = state
641                    .to_mut()
642                    .resolve_builtin_table_updates(op_builtin_table_updates);
643                builtin_table_updates.extend(op_builtin_table_updates);
644                parsed_catalog_updates.extend(op_catalog_updates);
645            }
646
647            match state {
648                Cow::Owned(state) => Ok(Some(state)),
649                Cow::Borrowed(_) => Ok(None),
650            }
651        } else {
652            Err(AdapterError::TransactionDryRun {
653                new_ops: dry_run_ops,
654                new_state: state.into_owned(),
655            })
656        }
657    }
658
659    /// Performs the transaction operation described by `op`. This function prepares the changes in
660    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
661    /// changes.
662    ///
663    /// Optionally returns a builtin table update for any builtin table updates than cannot be
664    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
665    /// scenarios and ideally in the future don't exist.
666    #[instrument]
667    async fn transact_op(
668        oracle_write_ts: mz_repr::Timestamp,
669        session: Option<&ConnMeta>,
670        op: Op,
671        temporary_ids: &BTreeSet<CatalogItemId>,
672        audit_events: &mut Vec<VersionedEvent>,
673        tx: &mut Transaction<'_>,
674        state: &CatalogState,
675        storage_collections_to_create: &mut BTreeSet<GlobalId>,
676        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
677        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
678    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
679        let mut weird_builtin_table_update = None;
680        let mut temporary_item_updates = Vec::new();
681
682        match op {
683            Op::TransactionDryRun => {
684                unreachable!("TransactionDryRun can only be used a final element of ops")
685            }
686            Op::AlterRetainHistory { id, value, window } => {
687                let entry = state.get_entry(&id);
688                if id.is_system() {
689                    let name = entry.name();
690                    let full_name =
691                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
692                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
693                        full_name.to_string(),
694                    ))));
695                }
696
697                let mut new_entry = entry.clone();
698                let previous = new_entry
699                    .item
700                    .update_retain_history(value.clone(), window)
701                    .map_err(|_| {
702                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
703                            "planner should have rejected invalid alter retain history item type"
704                                .to_string(),
705                        )))
706                    })?;
707
708                if Self::should_audit_log_item(new_entry.item()) {
709                    let details =
710                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
711                            id: id.to_string(),
712                            old_history: previous.map(|previous| previous.to_string()),
713                            new_history: value.map(|v| v.to_string()),
714                        });
715                    CatalogState::add_to_audit_log(
716                        &state.system_configuration,
717                        oracle_write_ts,
718                        session,
719                        tx,
720                        audit_events,
721                        EventType::Alter,
722                        catalog_type_to_audit_object_type(new_entry.item().typ()),
723                        details,
724                    )?;
725                }
726
727                tx.update_item(id, new_entry.into())?;
728
729                Self::log_update(state, &id);
730            }
731            Op::AlterRole {
732                id,
733                name,
734                attributes,
735                nopassword,
736                vars,
737            } => {
738                state.ensure_not_reserved_role(&id)?;
739
740                let mut existing_role = state.get_role(&id).clone();
741                let password = attributes.password.clone();
742                let scram_iterations = attributes
743                    .scram_iterations
744                    .unwrap_or_else(|| state.system_config().scram_iterations());
745                existing_role.attributes = attributes.into();
746                existing_role.vars = vars;
747                let password_action = if nopassword {
748                    PasswordAction::Clear
749                } else if let Some(password) = password {
750                    PasswordAction::Set(PasswordConfig {
751                        password,
752                        scram_iterations,
753                    })
754                } else {
755                    PasswordAction::NoChange
756                };
757                tx.update_role(id, existing_role.into(), password_action)?;
758
759                CatalogState::add_to_audit_log(
760                    &state.system_configuration,
761                    oracle_write_ts,
762                    session,
763                    tx,
764                    audit_events,
765                    EventType::Alter,
766                    ObjectType::Role,
767                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
768                        id: id.to_string(),
769                        name: name.clone(),
770                    }),
771                )?;
772
773                info!("update role {name} ({id})");
774            }
775            Op::AlterNetworkPolicy {
776                id,
777                rules,
778                name,
779                owner_id: _owner_id,
780            } => {
781                let existing_policy = state.get_network_policy(&id).clone();
782                let mut policy: NetworkPolicy = existing_policy.into();
783                policy.rules = rules;
784                if is_reserved_name(&name) {
785                    return Err(AdapterError::Catalog(Error::new(
786                        ErrorKind::ReservedNetworkPolicyName(name),
787                    )));
788                }
789                tx.update_network_policy(id, policy.clone())?;
790
791                CatalogState::add_to_audit_log(
792                    &state.system_configuration,
793                    oracle_write_ts,
794                    session,
795                    tx,
796                    audit_events,
797                    EventType::Alter,
798                    ObjectType::NetworkPolicy,
799                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
800                        id: id.to_string(),
801                        name: name.clone(),
802                    }),
803                )?;
804
805                info!("update network policy {name} ({id})");
806            }
807            Op::AlterAddColumn {
808                id,
809                new_global_id,
810                name,
811                typ,
812                sql,
813            } => {
814                let mut new_entry = state.get_entry(&id).clone();
815                let version = new_entry.item.add_column(name, typ, sql)?;
816                // All versions of a table share the same shard, so it shouldn't matter what
817                // GlobalId we use here.
818                let shard_id = state
819                    .storage_metadata()
820                    .get_collection_shard(new_entry.latest_global_id())?;
821
822                // TODO(alter_table): Support adding columns to sources.
823                let CatalogItem::Table(table) = &mut new_entry.item else {
824                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
825                };
826                table.collections.insert(version, new_global_id);
827
828                tx.update_item(id, new_entry.into())?;
829                storage_collections_to_register.insert(new_global_id, shard_id);
830            }
831            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
832                let mut new_entry = state.get_entry(&id).clone();
833                let replacement = state.get_entry(&replacement_id);
834
835                let new_audit_events =
836                    apply_replacement_audit_events(state, &new_entry, replacement);
837
838                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
839                    return Err(AdapterError::internal(
840                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
841                        "id must refer to a materialized view",
842                    ));
843                };
844                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
845                    return Err(AdapterError::internal(
846                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
847                        "replacement_id must refer to a materialized view",
848                    ));
849                };
850
851                mv.apply_replacement(replacement_mv.clone());
852
853                tx.remove_item(replacement_id)?;
854                tx.update_item(id, new_entry.into())?;
855
856                let comment_id = CommentObjectId::MaterializedView(replacement_id);
857                tx.drop_comments(&[comment_id].into())?;
858
859                for (event_type, details) in new_audit_events {
860                    CatalogState::add_to_audit_log(
861                        &state.system_configuration,
862                        oracle_write_ts,
863                        session,
864                        tx,
865                        audit_events,
866                        event_type,
867                        ObjectType::MaterializedView,
868                        details,
869                    )?;
870                }
871            }
872            Op::CreateDatabase { name, owner_id } => {
873                let database_owner_privileges = vec![rbac::owner_privilege(
874                    mz_sql::catalog::ObjectType::Database,
875                    owner_id,
876                )];
877                let database_default_privileges = state
878                    .default_privileges
879                    .get_applicable_privileges(
880                        owner_id,
881                        None,
882                        None,
883                        mz_sql::catalog::ObjectType::Database,
884                    )
885                    .map(|item| item.mz_acl_item(owner_id));
886                let database_privileges: Vec<_> = merge_mz_acl_items(
887                    database_owner_privileges
888                        .into_iter()
889                        .chain(database_default_privileges),
890                )
891                .collect();
892
893                let schema_owner_privileges = vec![rbac::owner_privilege(
894                    mz_sql::catalog::ObjectType::Schema,
895                    owner_id,
896                )];
897                let schema_default_privileges = state
898                    .default_privileges
899                    .get_applicable_privileges(
900                        owner_id,
901                        None,
902                        None,
903                        mz_sql::catalog::ObjectType::Schema,
904                    )
905                    .map(|item| item.mz_acl_item(owner_id))
906                    // Special default privilege on public schemas.
907                    .chain(std::iter::once(MzAclItem {
908                        grantee: RoleId::Public,
909                        grantor: owner_id,
910                        acl_mode: AclMode::USAGE,
911                    }));
912                let schema_privileges: Vec<_> = merge_mz_acl_items(
913                    schema_owner_privileges
914                        .into_iter()
915                        .chain(schema_default_privileges),
916                )
917                .collect();
918
919                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
920                let (database_id, _) = tx.insert_user_database(
921                    &name,
922                    owner_id,
923                    database_privileges.clone(),
924                    &temporary_oids,
925                )?;
926                let (schema_id, _) = tx.insert_user_schema(
927                    database_id,
928                    DEFAULT_SCHEMA,
929                    owner_id,
930                    schema_privileges.clone(),
931                    &temporary_oids,
932                )?;
933                CatalogState::add_to_audit_log(
934                    &state.system_configuration,
935                    oracle_write_ts,
936                    session,
937                    tx,
938                    audit_events,
939                    EventType::Create,
940                    ObjectType::Database,
941                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
942                        id: database_id.to_string(),
943                        name: name.clone(),
944                    }),
945                )?;
946                info!("create database {}", name);
947
948                CatalogState::add_to_audit_log(
949                    &state.system_configuration,
950                    oracle_write_ts,
951                    session,
952                    tx,
953                    audit_events,
954                    EventType::Create,
955                    ObjectType::Schema,
956                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
957                        id: schema_id.to_string(),
958                        name: DEFAULT_SCHEMA.to_string(),
959                        database_name: Some(name),
960                    }),
961                )?;
962            }
963            Op::CreateSchema {
964                database_id,
965                schema_name,
966                owner_id,
967            } => {
968                if is_reserved_name(&schema_name) {
969                    return Err(AdapterError::Catalog(Error::new(
970                        ErrorKind::ReservedSchemaName(schema_name),
971                    )));
972                }
973                let database_id = match database_id {
974                    ResolvedDatabaseSpecifier::Id(id) => id,
975                    ResolvedDatabaseSpecifier::Ambient => {
976                        return Err(AdapterError::Catalog(Error::new(
977                            ErrorKind::ReadOnlySystemSchema(schema_name),
978                        )));
979                    }
980                };
981                let owner_privileges = vec![rbac::owner_privilege(
982                    mz_sql::catalog::ObjectType::Schema,
983                    owner_id,
984                )];
985                let default_privileges = state
986                    .default_privileges
987                    .get_applicable_privileges(
988                        owner_id,
989                        Some(database_id),
990                        None,
991                        mz_sql::catalog::ObjectType::Schema,
992                    )
993                    .map(|item| item.mz_acl_item(owner_id));
994                let privileges: Vec<_> =
995                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
996                        .collect();
997                let (schema_id, _) = tx.insert_user_schema(
998                    database_id,
999                    &schema_name,
1000                    owner_id,
1001                    privileges.clone(),
1002                    &state.get_temporary_oids().collect(),
1003                )?;
1004                CatalogState::add_to_audit_log(
1005                    &state.system_configuration,
1006                    oracle_write_ts,
1007                    session,
1008                    tx,
1009                    audit_events,
1010                    EventType::Create,
1011                    ObjectType::Schema,
1012                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1013                        id: schema_id.to_string(),
1014                        name: schema_name.clone(),
1015                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1016                    }),
1017                )?;
1018            }
1019            Op::CreateRole { name, attributes } => {
1020                if is_reserved_role_name(&name) {
1021                    return Err(AdapterError::Catalog(Error::new(
1022                        ErrorKind::ReservedRoleName(name),
1023                    )));
1024                }
1025                let membership = RoleMembership::new();
1026                let vars = RoleVars::default();
1027                let (id, _) = tx.insert_user_role(
1028                    name.clone(),
1029                    attributes.clone(),
1030                    membership.clone(),
1031                    vars.clone(),
1032                    &state.get_temporary_oids().collect(),
1033                )?;
1034                CatalogState::add_to_audit_log(
1035                    &state.system_configuration,
1036                    oracle_write_ts,
1037                    session,
1038                    tx,
1039                    audit_events,
1040                    EventType::Create,
1041                    ObjectType::Role,
1042                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1043                        id: id.to_string(),
1044                        name: name.clone(),
1045                    }),
1046                )?;
1047                info!("create role {}", name);
1048            }
1049            Op::CreateCluster {
1050                id,
1051                name,
1052                introspection_sources,
1053                owner_id,
1054                config,
1055            } => {
1056                if is_reserved_name(&name) {
1057                    return Err(AdapterError::Catalog(Error::new(
1058                        ErrorKind::ReservedClusterName(name),
1059                    )));
1060                }
1061                let owner_privileges = vec![rbac::owner_privilege(
1062                    mz_sql::catalog::ObjectType::Cluster,
1063                    owner_id,
1064                )];
1065                let default_privileges = state
1066                    .default_privileges
1067                    .get_applicable_privileges(
1068                        owner_id,
1069                        None,
1070                        None,
1071                        mz_sql::catalog::ObjectType::Cluster,
1072                    )
1073                    .map(|item| item.mz_acl_item(owner_id));
1074                let privileges: Vec<_> =
1075                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1076                        .collect();
1077                let introspection_source_ids: Vec<_> = introspection_sources
1078                    .iter()
1079                    .map(|introspection_source| {
1080                        Transaction::allocate_introspection_source_index_id(
1081                            &id,
1082                            introspection_source.variant,
1083                        )
1084                    })
1085                    .collect();
1086
1087                let introspection_sources = introspection_sources
1088                    .into_iter()
1089                    .zip_eq(introspection_source_ids)
1090                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1091                    .collect();
1092
1093                tx.insert_user_cluster(
1094                    id,
1095                    &name,
1096                    introspection_sources,
1097                    owner_id,
1098                    privileges.clone(),
1099                    config.clone().into(),
1100                    &state.get_temporary_oids().collect(),
1101                )?;
1102                CatalogState::add_to_audit_log(
1103                    &state.system_configuration,
1104                    oracle_write_ts,
1105                    session,
1106                    tx,
1107                    audit_events,
1108                    EventType::Create,
1109                    ObjectType::Cluster,
1110                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1111                        id: id.to_string(),
1112                        name: name.clone(),
1113                    }),
1114                )?;
1115                info!("create cluster {}", name);
1116            }
1117            Op::CreateClusterReplica {
1118                cluster_id,
1119                name,
1120                config,
1121                owner_id,
1122                reason,
1123            } => {
1124                if is_reserved_name(&name) {
1125                    return Err(AdapterError::Catalog(Error::new(
1126                        ErrorKind::ReservedReplicaName(name),
1127                    )));
1128                }
1129                let cluster = state.get_cluster(cluster_id);
1130                let id =
1131                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1132                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1133                    size,
1134                    billed_as,
1135                    internal,
1136                    ..
1137                }) = &config.location
1138                {
1139                    let (reason, scheduling_policies) = reason.into_audit_log();
1140                    let details = EventDetails::CreateClusterReplicaV4(
1141                        mz_audit_log::CreateClusterReplicaV4 {
1142                            cluster_id: cluster_id.to_string(),
1143                            cluster_name: cluster.name.clone(),
1144                            replica_id: Some(id.to_string()),
1145                            replica_name: name.clone(),
1146                            logical_size: size.clone(),
1147                            billed_as: billed_as.clone(),
1148                            internal: *internal,
1149                            reason,
1150                            scheduling_policies,
1151                        },
1152                    );
1153                    CatalogState::add_to_audit_log(
1154                        &state.system_configuration,
1155                        oracle_write_ts,
1156                        session,
1157                        tx,
1158                        audit_events,
1159                        EventType::Create,
1160                        ObjectType::ClusterReplica,
1161                        details,
1162                    )?;
1163                }
1164            }
1165            Op::CreateItem {
1166                id,
1167                name,
1168                item,
1169                owner_id,
1170            } => {
1171                state.check_unstable_dependencies(&item)?;
1172
1173                match &item {
1174                    CatalogItem::Table(table) => {
1175                        let gids: Vec<_> = table.global_ids().collect();
1176                        assert_eq!(gids.len(), 1);
1177                        storage_collections_to_create.extend(gids);
1178                    }
1179                    CatalogItem::Source(source) => {
1180                        storage_collections_to_create.insert(source.global_id());
1181                    }
1182                    CatalogItem::MaterializedView(mv) => {
1183                        let mv_gid = mv.global_id_writes();
1184                        if let Some(target_id) = mv.replacement_target {
1185                            let target_gid = state.get_entry(&target_id).latest_global_id();
1186                            let shard_id =
1187                                state.storage_metadata().get_collection_shard(target_gid)?;
1188                            storage_collections_to_register.insert(mv_gid, shard_id);
1189                        } else {
1190                            storage_collections_to_create.insert(mv_gid);
1191                        }
1192                    }
1193                    CatalogItem::ContinualTask(ct) => {
1194                        storage_collections_to_create.insert(ct.global_id());
1195                    }
1196                    CatalogItem::Sink(sink) => {
1197                        storage_collections_to_create.insert(sink.global_id());
1198                    }
1199                    CatalogItem::Log(_)
1200                    | CatalogItem::View(_)
1201                    | CatalogItem::Index(_)
1202                    | CatalogItem::Type(_)
1203                    | CatalogItem::Func(_)
1204                    | CatalogItem::Secret(_)
1205                    | CatalogItem::Connection(_) => (),
1206                }
1207
1208                let system_user = session.map_or(false, |s| s.user().is_system_user());
1209                if !system_user {
1210                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1211                        let cluster_name = state.clusters_by_id[&id].name.clone();
1212                        return Err(AdapterError::Catalog(Error::new(
1213                            ErrorKind::ReadOnlyCluster(cluster_name),
1214                        )));
1215                    }
1216                }
1217
1218                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1219                let default_privileges = state
1220                    .default_privileges
1221                    .get_applicable_privileges(
1222                        owner_id,
1223                        name.qualifiers.database_spec.id(),
1224                        Some(name.qualifiers.schema_spec.into()),
1225                        item.typ().into(),
1226                    )
1227                    .map(|item| item.mz_acl_item(owner_id));
1228                // mz_support can read all progress sources.
1229                let progress_source_privilege = if item.is_progress_source() {
1230                    Some(MzAclItem {
1231                        grantee: MZ_SUPPORT_ROLE_ID,
1232                        grantor: owner_id,
1233                        acl_mode: AclMode::SELECT,
1234                    })
1235                } else {
1236                    None
1237                };
1238                let privileges: Vec<_> = merge_mz_acl_items(
1239                    owner_privileges
1240                        .into_iter()
1241                        .chain(default_privileges)
1242                        .chain(progress_source_privilege),
1243                )
1244                .collect();
1245
1246                let temporary_oids = state.get_temporary_oids().collect();
1247
1248                if item.is_temporary() {
1249                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1250                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1251                    {
1252                        return Err(AdapterError::Catalog(Error::new(
1253                            ErrorKind::InvalidTemporarySchema,
1254                        )));
1255                    }
1256                    let oid = tx.allocate_oid(&temporary_oids)?;
1257
1258                    let schema_id = name.qualifiers.schema_spec.clone().into();
1259                    let item_type = item.typ();
1260                    let (create_sql, global_id, versions) = item.to_serialized();
1261
1262                    let item = TemporaryItem {
1263                        id,
1264                        oid,
1265                        global_id,
1266                        schema_id,
1267                        name: name.item.clone(),
1268                        create_sql,
1269                        conn_id: item.conn_id().cloned(),
1270                        owner_id,
1271                        privileges: privileges.clone(),
1272                        extra_versions: versions,
1273                    };
1274                    temporary_item_updates.push((item, StateDiff::Addition));
1275
1276                    info!(
1277                        "create temporary {} {} ({})",
1278                        item_type,
1279                        state.resolve_full_name(&name, None),
1280                        id
1281                    );
1282                } else {
1283                    if let Some(temp_id) =
1284                        item.uses()
1285                            .iter()
1286                            .find(|id| match state.try_get_entry(*id) {
1287                                Some(entry) => entry.item().is_temporary(),
1288                                None => temporary_ids.contains(id),
1289                            })
1290                    {
1291                        let temp_item = state.get_entry(temp_id);
1292                        return Err(AdapterError::Catalog(Error::new(
1293                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1294                        )));
1295                    }
1296                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1297                        && !system_user
1298                    {
1299                        let schema_name = state
1300                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1301                            .schema;
1302                        return Err(AdapterError::Catalog(Error::new(
1303                            ErrorKind::ReadOnlySystemSchema(schema_name),
1304                        )));
1305                    }
1306                    let schema_id = name.qualifiers.schema_spec.clone().into();
1307                    let item_type = item.typ();
1308                    let (create_sql, global_id, versions) = item.to_serialized();
1309                    tx.insert_user_item(
1310                        id,
1311                        global_id,
1312                        schema_id,
1313                        &name.item,
1314                        create_sql,
1315                        owner_id,
1316                        privileges.clone(),
1317                        &temporary_oids,
1318                        versions,
1319                    )?;
1320                    info!(
1321                        "create {} {} ({})",
1322                        item_type,
1323                        state.resolve_full_name(&name, None),
1324                        id
1325                    );
1326                }
1327
1328                if Self::should_audit_log_item(&item) {
1329                    let name = Self::full_name_detail(
1330                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1331                    );
1332                    let details = match &item {
1333                        CatalogItem::Source(s) => {
1334                            let cluster_id = match s.data_source {
1335                                // Ingestion exports don't have their own cluster, but
1336                                // run on their ingestion's cluster.
1337                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1338                                    match state.get_entry(&ingestion_id).cluster_id() {
1339                                        Some(cluster_id) => Some(cluster_id.to_string()),
1340                                        None => None,
1341                                    }
1342                                }
1343                                _ => match item.cluster_id() {
1344                                    Some(cluster_id) => Some(cluster_id.to_string()),
1345                                    None => None,
1346                                },
1347                            };
1348
1349                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1350                                id: id.to_string(),
1351                                cluster_id,
1352                                name,
1353                                external_type: s.source_type().to_string(),
1354                            })
1355                        }
1356                        CatalogItem::Sink(s) => {
1357                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1358                                id: id.to_string(),
1359                                cluster_id: Some(s.cluster_id.to_string()),
1360                                name,
1361                                external_type: s.sink_type().to_string(),
1362                            })
1363                        }
1364                        CatalogItem::Index(i) => {
1365                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1366                                id: id.to_string(),
1367                                name,
1368                                cluster_id: i.cluster_id.to_string(),
1369                            })
1370                        }
1371                        CatalogItem::MaterializedView(mv) => {
1372                            EventDetails::CreateMaterializedViewV1(
1373                                mz_audit_log::CreateMaterializedViewV1 {
1374                                    id: id.to_string(),
1375                                    name,
1376                                    cluster_id: mv.cluster_id.to_string(),
1377                                    replacement_target_id: mv
1378                                        .replacement_target
1379                                        .map(|id| id.to_string()),
1380                                },
1381                            )
1382                        }
1383                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1384                            id: id.to_string(),
1385                            name,
1386                        }),
1387                    };
1388                    CatalogState::add_to_audit_log(
1389                        &state.system_configuration,
1390                        oracle_write_ts,
1391                        session,
1392                        tx,
1393                        audit_events,
1394                        EventType::Create,
1395                        catalog_type_to_audit_object_type(item.typ()),
1396                        details,
1397                    )?;
1398                }
1399            }
1400            Op::CreateNetworkPolicy {
1401                rules,
1402                name,
1403                owner_id,
1404            } => {
1405                if state.network_policies_by_name.contains_key(&name) {
1406                    return Err(AdapterError::PlanError(PlanError::Catalog(
1407                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1408                    )));
1409                }
1410                if is_reserved_name(&name) {
1411                    return Err(AdapterError::Catalog(Error::new(
1412                        ErrorKind::ReservedNetworkPolicyName(name),
1413                    )));
1414                }
1415
1416                let owner_privileges = vec![rbac::owner_privilege(
1417                    mz_sql::catalog::ObjectType::NetworkPolicy,
1418                    owner_id,
1419                )];
1420                let default_privileges = state
1421                    .default_privileges
1422                    .get_applicable_privileges(
1423                        owner_id,
1424                        None,
1425                        None,
1426                        mz_sql::catalog::ObjectType::NetworkPolicy,
1427                    )
1428                    .map(|item| item.mz_acl_item(owner_id));
1429                let privileges: Vec<_> =
1430                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1431                        .collect();
1432
1433                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1434                let id = tx.insert_user_network_policy(
1435                    name.clone(),
1436                    rules,
1437                    privileges,
1438                    owner_id,
1439                    &temporary_oids,
1440                )?;
1441
1442                CatalogState::add_to_audit_log(
1443                    &state.system_configuration,
1444                    oracle_write_ts,
1445                    session,
1446                    tx,
1447                    audit_events,
1448                    EventType::Create,
1449                    ObjectType::NetworkPolicy,
1450                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1451                        id: id.to_string(),
1452                        name: name.clone(),
1453                    }),
1454                )?;
1455
1456                info!("created network policy {name} ({id})");
1457            }
1458            Op::Comment {
1459                object_id,
1460                sub_component,
1461                comment,
1462            } => {
1463                tx.update_comment(object_id, sub_component, comment)?;
1464                let entry = state.get_comment_id_entry(&object_id);
1465                let should_log = entry
1466                    .map(|entry| Self::should_audit_log_item(entry.item()))
1467                    // Things that aren't catalog entries can't be temp, so should be logged.
1468                    .unwrap_or(true);
1469                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1470                // comments won't be logged for now.
1471                if let (Some(conn_id), true) =
1472                    (session.map(|session| session.conn_id()), should_log)
1473                {
1474                    CatalogState::add_to_audit_log(
1475                        &state.system_configuration,
1476                        oracle_write_ts,
1477                        session,
1478                        tx,
1479                        audit_events,
1480                        EventType::Comment,
1481                        comment_id_to_audit_object_type(object_id),
1482                        EventDetails::IdNameV1(IdNameV1 {
1483                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1484                            id: format!("{object_id:?}"),
1485                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1486                        }),
1487                    )?;
1488                }
1489            }
1490            Op::UpdateSourceReferences {
1491                source_id,
1492                references,
1493            } => {
1494                tx.update_source_references(
1495                    source_id,
1496                    references
1497                        .references
1498                        .into_iter()
1499                        .map(|reference| reference.into())
1500                        .collect(),
1501                    references.updated_at,
1502                )?;
1503            }
1504            Op::DropObjects(drop_object_infos) => {
1505                // Generate all of the objects that need to get dropped.
1506                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1507
1508                // Drop any associated comments.
1509                tx.drop_comments(&delta.comments)?;
1510
1511                // Drop any items.
1512                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1513                    delta
1514                        .items
1515                        .iter()
1516                        .map(|id| id)
1517                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1518                tx.remove_items(&durable_items_to_drop)?;
1519                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1520                    let entry = state.get_entry(&id);
1521                    (entry.clone().into(), StateDiff::Retraction)
1522                }));
1523
1524                for item_id in delta.items {
1525                    let entry = state.get_entry(&item_id);
1526
1527                    if entry.item().is_storage_collection() {
1528                        storage_collections_to_drop.extend(entry.global_ids());
1529                    }
1530
1531                    if state.source_references.contains_key(&item_id) {
1532                        tx.remove_source_references(item_id)?;
1533                    }
1534
1535                    if Self::should_audit_log_item(entry.item()) {
1536                        CatalogState::add_to_audit_log(
1537                            &state.system_configuration,
1538                            oracle_write_ts,
1539                            session,
1540                            tx,
1541                            audit_events,
1542                            EventType::Drop,
1543                            catalog_type_to_audit_object_type(entry.item().typ()),
1544                            EventDetails::IdFullNameV1(IdFullNameV1 {
1545                                id: item_id.to_string(),
1546                                name: Self::full_name_detail(&state.resolve_full_name(
1547                                    entry.name(),
1548                                    session.map(|session| session.conn_id()),
1549                                )),
1550                            }),
1551                        )?;
1552                    }
1553                    info!(
1554                        "drop {} {} ({})",
1555                        entry.item_type(),
1556                        state.resolve_full_name(entry.name(), entry.conn_id()),
1557                        item_id
1558                    );
1559                }
1560
1561                // Drop any schemas.
1562                let schemas = delta
1563                    .schemas
1564                    .iter()
1565                    .map(|(schema_spec, database_spec)| {
1566                        (SchemaId::from(schema_spec), *database_spec)
1567                    })
1568                    .collect();
1569                tx.remove_schemas(&schemas)?;
1570
1571                for (schema_spec, database_spec) in delta.schemas {
1572                    let schema = state.get_schema(
1573                        &database_spec,
1574                        &schema_spec,
1575                        session
1576                            .map(|session| session.conn_id())
1577                            .unwrap_or(&SYSTEM_CONN_ID),
1578                    );
1579
1580                    let schema_id = SchemaId::from(schema_spec);
1581                    let database_id = match database_spec {
1582                        ResolvedDatabaseSpecifier::Ambient => None,
1583                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1584                    };
1585
1586                    CatalogState::add_to_audit_log(
1587                        &state.system_configuration,
1588                        oracle_write_ts,
1589                        session,
1590                        tx,
1591                        audit_events,
1592                        EventType::Drop,
1593                        ObjectType::Schema,
1594                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1595                            id: schema_id.to_string(),
1596                            name: schema.name.schema.to_string(),
1597                            database_name: database_id
1598                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1599                        }),
1600                    )?;
1601                }
1602
1603                // Drop any databases.
1604                tx.remove_databases(&delta.databases)?;
1605
1606                for database_id in delta.databases {
1607                    let database = state.get_database(&database_id).clone();
1608
1609                    CatalogState::add_to_audit_log(
1610                        &state.system_configuration,
1611                        oracle_write_ts,
1612                        session,
1613                        tx,
1614                        audit_events,
1615                        EventType::Drop,
1616                        ObjectType::Database,
1617                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1618                            id: database_id.to_string(),
1619                            name: database.name.clone(),
1620                        }),
1621                    )?;
1622                }
1623
1624                // Drop any roles.
1625                tx.remove_user_roles(&delta.roles)?;
1626
1627                for role_id in delta.roles {
1628                    let role = state
1629                        .roles_by_id
1630                        .get(&role_id)
1631                        .expect("catalog out of sync");
1632
1633                    CatalogState::add_to_audit_log(
1634                        &state.system_configuration,
1635                        oracle_write_ts,
1636                        session,
1637                        tx,
1638                        audit_events,
1639                        EventType::Drop,
1640                        ObjectType::Role,
1641                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1642                            id: role.id.to_string(),
1643                            name: role.name.clone(),
1644                        }),
1645                    )?;
1646                    info!("drop role {}", role.name());
1647                }
1648
1649                // Drop any network policies.
1650                tx.remove_network_policies(&delta.network_policies)?;
1651
1652                for network_policy_id in delta.network_policies {
1653                    let policy = state
1654                        .network_policies_by_id
1655                        .get(&network_policy_id)
1656                        .expect("catalog out of sync");
1657
1658                    CatalogState::add_to_audit_log(
1659                        &state.system_configuration,
1660                        oracle_write_ts,
1661                        session,
1662                        tx,
1663                        audit_events,
1664                        EventType::Drop,
1665                        ObjectType::NetworkPolicy,
1666                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1667                            id: policy.id.to_string(),
1668                            name: policy.name.clone(),
1669                        }),
1670                    )?;
1671                    info!("drop network policy {}", policy.name.clone());
1672                }
1673
1674                // Drop any replicas.
1675                let replicas = delta.replicas.keys().copied().collect();
1676                tx.remove_cluster_replicas(&replicas)?;
1677
1678                for (replica_id, (cluster_id, reason)) in delta.replicas {
1679                    let cluster = state.get_cluster(cluster_id);
1680                    let replica = cluster.replica(replica_id).expect("Must exist");
1681
1682                    let (reason, scheduling_policies) = reason.into_audit_log();
1683                    let details =
1684                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1685                            cluster_id: cluster_id.to_string(),
1686                            cluster_name: cluster.name.clone(),
1687                            replica_id: Some(replica_id.to_string()),
1688                            replica_name: replica.name.clone(),
1689                            reason,
1690                            scheduling_policies,
1691                        });
1692                    CatalogState::add_to_audit_log(
1693                        &state.system_configuration,
1694                        oracle_write_ts,
1695                        session,
1696                        tx,
1697                        audit_events,
1698                        EventType::Drop,
1699                        ObjectType::ClusterReplica,
1700                        details,
1701                    )?;
1702                }
1703
1704                // Drop any clusters.
1705                tx.remove_clusters(&delta.clusters)?;
1706
1707                for cluster_id in delta.clusters {
1708                    let cluster = state.get_cluster(cluster_id);
1709
1710                    CatalogState::add_to_audit_log(
1711                        &state.system_configuration,
1712                        oracle_write_ts,
1713                        session,
1714                        tx,
1715                        audit_events,
1716                        EventType::Drop,
1717                        ObjectType::Cluster,
1718                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1719                            id: cluster.id.to_string(),
1720                            name: cluster.name.clone(),
1721                        }),
1722                    )?;
1723                }
1724            }
1725            Op::GrantRole {
1726                role_id,
1727                member_id,
1728                grantor_id,
1729            } => {
1730                state.ensure_not_reserved_role(&member_id)?;
1731                state.ensure_grantable_role(&role_id)?;
1732                if state.collect_role_membership(&role_id).contains(&member_id) {
1733                    let group_role = state.get_role(&role_id);
1734                    let member_role = state.get_role(&member_id);
1735                    return Err(AdapterError::Catalog(Error::new(
1736                        ErrorKind::CircularRoleMembership {
1737                            role_name: group_role.name().to_string(),
1738                            member_name: member_role.name().to_string(),
1739                        },
1740                    )));
1741                }
1742                let mut member_role = state.get_role(&member_id).clone();
1743                member_role.membership.map.insert(role_id, grantor_id);
1744                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1745
1746                CatalogState::add_to_audit_log(
1747                    &state.system_configuration,
1748                    oracle_write_ts,
1749                    session,
1750                    tx,
1751                    audit_events,
1752                    EventType::Grant,
1753                    ObjectType::Role,
1754                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1755                        role_id: role_id.to_string(),
1756                        member_id: member_id.to_string(),
1757                        grantor_id: grantor_id.to_string(),
1758                        executed_by: session
1759                            .map(|session| session.authenticated_role_id())
1760                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1761                            .to_string(),
1762                    }),
1763                )?;
1764            }
1765            Op::RevokeRole {
1766                role_id,
1767                member_id,
1768                grantor_id,
1769            } => {
1770                state.ensure_not_reserved_role(&member_id)?;
1771                state.ensure_grantable_role(&role_id)?;
1772                let mut member_role = state.get_role(&member_id).clone();
1773                member_role.membership.map.remove(&role_id);
1774                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1775
1776                CatalogState::add_to_audit_log(
1777                    &state.system_configuration,
1778                    oracle_write_ts,
1779                    session,
1780                    tx,
1781                    audit_events,
1782                    EventType::Revoke,
1783                    ObjectType::Role,
1784                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1785                        role_id: role_id.to_string(),
1786                        member_id: member_id.to_string(),
1787                        grantor_id: grantor_id.to_string(),
1788                        executed_by: session
1789                            .map(|session| session.authenticated_role_id())
1790                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1791                            .to_string(),
1792                    }),
1793                )?;
1794            }
1795            Op::UpdatePrivilege {
1796                target_id,
1797                privilege,
1798                variant,
1799            } => {
1800                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1801                    UpdatePrivilegeVariant::Grant => {
1802                        privileges.grant(privilege);
1803                    }
1804                    UpdatePrivilegeVariant::Revoke => {
1805                        privileges.revoke(&privilege);
1806                    }
1807                };
1808                match &target_id {
1809                    SystemObjectId::Object(object_id) => match object_id {
1810                        ObjectId::Cluster(id) => {
1811                            let mut cluster = state.get_cluster(*id).clone();
1812                            update_privilege_fn(&mut cluster.privileges);
1813                            tx.update_cluster(*id, cluster.into())?;
1814                        }
1815                        ObjectId::Database(id) => {
1816                            let mut database = state.get_database(id).clone();
1817                            update_privilege_fn(&mut database.privileges);
1818                            tx.update_database(*id, database.into())?;
1819                        }
1820                        ObjectId::NetworkPolicy(id) => {
1821                            let mut policy = state.get_network_policy(id).clone();
1822                            update_privilege_fn(&mut policy.privileges);
1823                            tx.update_network_policy(*id, policy.into())?;
1824                        }
1825                        ObjectId::Schema((database_spec, schema_spec)) => {
1826                            let schema_id = schema_spec.clone().into();
1827                            let mut schema = state
1828                                .get_schema(
1829                                    database_spec,
1830                                    schema_spec,
1831                                    session
1832                                        .map(|session| session.conn_id())
1833                                        .unwrap_or(&SYSTEM_CONN_ID),
1834                                )
1835                                .clone();
1836                            update_privilege_fn(&mut schema.privileges);
1837                            tx.update_schema(schema_id, schema.into())?;
1838                        }
1839                        ObjectId::Item(id) => {
1840                            let entry = state.get_entry(id);
1841                            let mut new_entry = entry.clone();
1842                            update_privilege_fn(&mut new_entry.privileges);
1843                            if !new_entry.item().is_temporary() {
1844                                tx.update_item(*id, new_entry.into())?;
1845                            } else {
1846                                temporary_item_updates
1847                                    .push((entry.clone().into(), StateDiff::Retraction));
1848                                temporary_item_updates
1849                                    .push((new_entry.into(), StateDiff::Addition));
1850                            }
1851                        }
1852                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1853                    },
1854                    SystemObjectId::System => {
1855                        let mut system_privileges = state.system_privileges.clone();
1856                        update_privilege_fn(&mut system_privileges);
1857                        let new_privilege =
1858                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1859                        tx.set_system_privilege(
1860                            privilege.grantee,
1861                            privilege.grantor,
1862                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1863                        )?;
1864                    }
1865                }
1866                let object_type = state.get_system_object_type(&target_id);
1867                let object_id_str = match &target_id {
1868                    SystemObjectId::System => "SYSTEM".to_string(),
1869                    SystemObjectId::Object(id) => id.to_string(),
1870                };
1871                CatalogState::add_to_audit_log(
1872                    &state.system_configuration,
1873                    oracle_write_ts,
1874                    session,
1875                    tx,
1876                    audit_events,
1877                    variant.into(),
1878                    system_object_type_to_audit_object_type(&object_type),
1879                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1880                        object_id: object_id_str,
1881                        grantee_id: privilege.grantee.to_string(),
1882                        grantor_id: privilege.grantor.to_string(),
1883                        privileges: privilege.acl_mode.to_string(),
1884                    }),
1885                )?;
1886            }
1887            Op::UpdateDefaultPrivilege {
1888                privilege_object,
1889                privilege_acl_item,
1890                variant,
1891            } => {
1892                let mut default_privileges = state.default_privileges.clone();
1893                match variant {
1894                    UpdatePrivilegeVariant::Grant => default_privileges
1895                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1896                    UpdatePrivilegeVariant::Revoke => {
1897                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1898                    }
1899                }
1900                let new_acl_mode = default_privileges
1901                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1902                tx.set_default_privilege(
1903                    privilege_object.role_id,
1904                    privilege_object.database_id,
1905                    privilege_object.schema_id,
1906                    privilege_object.object_type,
1907                    privilege_acl_item.grantee,
1908                    new_acl_mode.cloned(),
1909                )?;
1910                CatalogState::add_to_audit_log(
1911                    &state.system_configuration,
1912                    oracle_write_ts,
1913                    session,
1914                    tx,
1915                    audit_events,
1916                    variant.into(),
1917                    object_type_to_audit_object_type(privilege_object.object_type),
1918                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1919                        role_id: privilege_object.role_id.to_string(),
1920                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1921                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1922                        grantee_id: privilege_acl_item.grantee.to_string(),
1923                        privileges: privilege_acl_item.acl_mode.to_string(),
1924                    }),
1925                )?;
1926            }
1927            Op::RenameCluster {
1928                id,
1929                name,
1930                to_name,
1931                check_reserved_names,
1932            } => {
1933                if id.is_system() {
1934                    return Err(AdapterError::Catalog(Error::new(
1935                        ErrorKind::ReadOnlyCluster(name.clone()),
1936                    )));
1937                }
1938                if check_reserved_names && is_reserved_name(&to_name) {
1939                    return Err(AdapterError::Catalog(Error::new(
1940                        ErrorKind::ReservedClusterName(to_name),
1941                    )));
1942                }
1943                tx.rename_cluster(id, &name, &to_name)?;
1944                CatalogState::add_to_audit_log(
1945                    &state.system_configuration,
1946                    oracle_write_ts,
1947                    session,
1948                    tx,
1949                    audit_events,
1950                    EventType::Alter,
1951                    ObjectType::Cluster,
1952                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1953                        id: id.to_string(),
1954                        old_name: name.clone(),
1955                        new_name: to_name.clone(),
1956                    }),
1957                )?;
1958                info!("rename cluster {name} to {to_name}");
1959            }
1960            Op::RenameClusterReplica {
1961                cluster_id,
1962                replica_id,
1963                name,
1964                to_name,
1965            } => {
1966                if is_reserved_name(&to_name) {
1967                    return Err(AdapterError::Catalog(Error::new(
1968                        ErrorKind::ReservedReplicaName(to_name),
1969                    )));
1970                }
1971                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1972                CatalogState::add_to_audit_log(
1973                    &state.system_configuration,
1974                    oracle_write_ts,
1975                    session,
1976                    tx,
1977                    audit_events,
1978                    EventType::Alter,
1979                    ObjectType::ClusterReplica,
1980                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1981                        cluster_id: cluster_id.to_string(),
1982                        replica_id: replica_id.to_string(),
1983                        old_name: name.replica.as_str().to_string(),
1984                        new_name: to_name.clone(),
1985                    }),
1986                )?;
1987                info!("rename cluster replica {name} to {to_name}");
1988            }
1989            Op::RenameItem {
1990                id,
1991                to_name,
1992                current_full_name,
1993            } => {
1994                let mut updates = Vec::new();
1995
1996                let entry = state.get_entry(&id);
1997                if let CatalogItem::Type(_) = entry.item() {
1998                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1999                        current_full_name.to_string(),
2000                    ))));
2001                }
2002
2003                if entry.id().is_system() {
2004                    let name = state
2005                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2006                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2007                        name.to_string(),
2008                    ))));
2009                }
2010
2011                let mut to_full_name = current_full_name.clone();
2012                to_full_name.item.clone_from(&to_name);
2013
2014                let mut to_qualified_name = entry.name().clone();
2015                to_qualified_name.item.clone_from(&to_name);
2016
2017                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2018                    id: id.to_string(),
2019                    old_name: Self::full_name_detail(&current_full_name),
2020                    new_name: Self::full_name_detail(&to_full_name),
2021                });
2022                if Self::should_audit_log_item(entry.item()) {
2023                    CatalogState::add_to_audit_log(
2024                        &state.system_configuration,
2025                        oracle_write_ts,
2026                        session,
2027                        tx,
2028                        audit_events,
2029                        EventType::Alter,
2030                        catalog_type_to_audit_object_type(entry.item().typ()),
2031                        details,
2032                    )?;
2033                }
2034
2035                // Rename item itself.
2036                let mut new_entry = entry.clone();
2037                new_entry.name.item.clone_from(&to_name);
2038                new_entry.item = entry
2039                    .item()
2040                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2041                    .map_err(|e| {
2042                        Error::new(ErrorKind::from(AmbiguousRename {
2043                            depender: state
2044                                .resolve_full_name(entry.name(), entry.conn_id())
2045                                .to_string(),
2046                            dependee: state
2047                                .resolve_full_name(entry.name(), entry.conn_id())
2048                                .to_string(),
2049                            message: e,
2050                        }))
2051                    })?;
2052
2053                for id in entry.referenced_by() {
2054                    let dependent_item = state.get_entry(id);
2055                    let mut to_entry = dependent_item.clone();
2056                    to_entry.item = dependent_item
2057                        .item()
2058                        .rename_item_refs(
2059                            current_full_name.clone(),
2060                            to_full_name.item.clone(),
2061                            false,
2062                        )
2063                        .map_err(|e| {
2064                            Error::new(ErrorKind::from(AmbiguousRename {
2065                                depender: state
2066                                    .resolve_full_name(
2067                                        dependent_item.name(),
2068                                        dependent_item.conn_id(),
2069                                    )
2070                                    .to_string(),
2071                                dependee: state
2072                                    .resolve_full_name(entry.name(), entry.conn_id())
2073                                    .to_string(),
2074                                message: e,
2075                            }))
2076                        })?;
2077
2078                    if !to_entry.item().is_temporary() {
2079                        tx.update_item(*id, to_entry.into())?;
2080                    } else {
2081                        temporary_item_updates
2082                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2083                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2084                    }
2085                    updates.push(*id);
2086                }
2087                if !new_entry.item().is_temporary() {
2088                    tx.update_item(id, new_entry.into())?;
2089                } else {
2090                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2091                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2092                }
2093
2094                updates.push(id);
2095                for id in updates {
2096                    Self::log_update(state, &id);
2097                }
2098            }
2099            Op::RenameSchema {
2100                database_spec,
2101                schema_spec,
2102                new_name,
2103                check_reserved_names,
2104            } => {
2105                if check_reserved_names && is_reserved_name(&new_name) {
2106                    return Err(AdapterError::Catalog(Error::new(
2107                        ErrorKind::ReservedSchemaName(new_name),
2108                    )));
2109                }
2110
2111                let conn_id = session
2112                    .map(|session| session.conn_id())
2113                    .unwrap_or(&SYSTEM_CONN_ID);
2114
2115                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2116                let cur_name = schema.name().schema.clone();
2117
2118                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2119                    return Err(AdapterError::Catalog(Error::new(
2120                        ErrorKind::AmbientSchemaRename(cur_name),
2121                    )));
2122                };
2123                let database = state.get_database(&database_id);
2124                let database_name = &database.name;
2125
2126                let mut updates = Vec::new();
2127                let mut items_to_update = BTreeMap::new();
2128
2129                let mut update_item = |id| {
2130                    if items_to_update.contains_key(id) {
2131                        return Ok(());
2132                    }
2133
2134                    let entry = state.get_entry(id);
2135
2136                    // Update our item.
2137                    let mut new_entry = entry.clone();
2138                    new_entry.item = entry
2139                        .item
2140                        .rename_schema_refs(database_name, &cur_name, &new_name)
2141                        .map_err(|(s, _i)| {
2142                            Error::new(ErrorKind::from(AmbiguousRename {
2143                                depender: state
2144                                    .resolve_full_name(entry.name(), entry.conn_id())
2145                                    .to_string(),
2146                                dependee: format!("{database_name}.{cur_name}"),
2147                                message: format!("ambiguous reference to schema named {s}"),
2148                            }))
2149                        })?;
2150
2151                    // Queue updates for Catalog storage and Builtin Tables.
2152                    if !new_entry.item().is_temporary() {
2153                        items_to_update.insert(*id, new_entry.into());
2154                    } else {
2155                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2156                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2157                    }
2158                    updates.push(id);
2159
2160                    Ok::<_, AdapterError>(())
2161                };
2162
2163                // Update all of the items in the schema.
2164                for (_name, item_id) in &schema.items {
2165                    // Update the item itself.
2166                    update_item(item_id)?;
2167
2168                    // Update everything that depends on this item.
2169                    for id in state.get_entry(item_id).referenced_by() {
2170                        update_item(id)?;
2171                    }
2172                }
2173                // Note: When updating the transaction it's very important that we update the
2174                // items as a whole group, otherwise we exhibit quadratic behavior.
2175                tx.update_items(items_to_update)?;
2176
2177                // Renaming temporary schemas is not supported.
2178                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2179                    let schema_name = schema.name().schema.clone();
2180                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2181                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2182                    )));
2183                };
2184
2185                // Add an entry to the audit log.
2186                let database_name = database_spec
2187                    .id()
2188                    .map(|id| state.get_database(&id).name.clone());
2189                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2190                    id: schema_id.to_string(),
2191                    old_name: schema.name().schema.clone(),
2192                    new_name: new_name.clone(),
2193                    database_name,
2194                });
2195                CatalogState::add_to_audit_log(
2196                    &state.system_configuration,
2197                    oracle_write_ts,
2198                    session,
2199                    tx,
2200                    audit_events,
2201                    EventType::Alter,
2202                    mz_audit_log::ObjectType::Schema,
2203                    details,
2204                )?;
2205
2206                // Update the schema itself.
2207                let mut new_schema = schema.clone();
2208                new_schema.name.schema.clone_from(&new_name);
2209                tx.update_schema(schema_id, new_schema.into())?;
2210
2211                for id in updates {
2212                    Self::log_update(state, id);
2213                }
2214            }
2215            Op::UpdateOwner { id, new_owner } => {
2216                let conn_id = session
2217                    .map(|session| session.conn_id())
2218                    .unwrap_or(&SYSTEM_CONN_ID);
2219                let old_owner = state
2220                    .get_owner_id(&id, conn_id)
2221                    .expect("cannot update the owner of an object without an owner");
2222                match &id {
2223                    ObjectId::Cluster(id) => {
2224                        let mut cluster = state.get_cluster(*id).clone();
2225                        if id.is_system() {
2226                            return Err(AdapterError::Catalog(Error::new(
2227                                ErrorKind::ReadOnlyCluster(cluster.name),
2228                            )));
2229                        }
2230                        Self::update_privilege_owners(
2231                            &mut cluster.privileges,
2232                            cluster.owner_id,
2233                            new_owner,
2234                        );
2235                        cluster.owner_id = new_owner;
2236                        tx.update_cluster(*id, cluster.into())?;
2237                    }
2238                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2239                        let cluster = state.get_cluster(*cluster_id);
2240                        let mut replica = cluster
2241                            .replica(*replica_id)
2242                            .expect("catalog out of sync")
2243                            .clone();
2244                        if replica_id.is_system() {
2245                            return Err(AdapterError::Catalog(Error::new(
2246                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2247                            )));
2248                        }
2249                        replica.owner_id = new_owner;
2250                        tx.update_cluster_replica(*replica_id, replica.into())?;
2251                    }
2252                    ObjectId::Database(id) => {
2253                        let mut database = state.get_database(id).clone();
2254                        if id.is_system() {
2255                            return Err(AdapterError::Catalog(Error::new(
2256                                ErrorKind::ReadOnlyDatabase(database.name),
2257                            )));
2258                        }
2259                        Self::update_privilege_owners(
2260                            &mut database.privileges,
2261                            database.owner_id,
2262                            new_owner,
2263                        );
2264                        database.owner_id = new_owner;
2265                        tx.update_database(*id, database.clone().into())?;
2266                    }
2267                    ObjectId::Schema((database_spec, schema_spec)) => {
2268                        let schema_id: SchemaId = schema_spec.clone().into();
2269                        let mut schema = state
2270                            .get_schema(database_spec, schema_spec, conn_id)
2271                            .clone();
2272                        if schema_id.is_system() {
2273                            let name = schema.name();
2274                            let full_name = state.resolve_full_schema_name(name);
2275                            return Err(AdapterError::Catalog(Error::new(
2276                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2277                            )));
2278                        }
2279                        Self::update_privilege_owners(
2280                            &mut schema.privileges,
2281                            schema.owner_id,
2282                            new_owner,
2283                        );
2284                        schema.owner_id = new_owner;
2285                        tx.update_schema(schema_id, schema.into())?;
2286                    }
2287                    ObjectId::Item(id) => {
2288                        let entry = state.get_entry(id);
2289                        let mut new_entry = entry.clone();
2290                        if id.is_system() {
2291                            let full_name = state.resolve_full_name(
2292                                new_entry.name(),
2293                                session.map(|session| session.conn_id()),
2294                            );
2295                            return Err(AdapterError::Catalog(Error::new(
2296                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2297                            )));
2298                        }
2299                        Self::update_privilege_owners(
2300                            &mut new_entry.privileges,
2301                            new_entry.owner_id,
2302                            new_owner,
2303                        );
2304                        new_entry.owner_id = new_owner;
2305                        if !new_entry.item().is_temporary() {
2306                            tx.update_item(*id, new_entry.into())?;
2307                        } else {
2308                            temporary_item_updates
2309                                .push((entry.clone().into(), StateDiff::Retraction));
2310                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2311                        }
2312                    }
2313                    ObjectId::NetworkPolicy(id) => {
2314                        let mut policy = state.get_network_policy(id).clone();
2315                        if id.is_system() {
2316                            return Err(AdapterError::Catalog(Error::new(
2317                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2318                            )));
2319                        }
2320                        Self::update_privilege_owners(
2321                            &mut policy.privileges,
2322                            policy.owner_id,
2323                            new_owner,
2324                        );
2325                        policy.owner_id = new_owner;
2326                        tx.update_network_policy(*id, policy.into())?;
2327                    }
2328                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2329                }
2330                let object_type = state.get_object_type(&id);
2331                CatalogState::add_to_audit_log(
2332                    &state.system_configuration,
2333                    oracle_write_ts,
2334                    session,
2335                    tx,
2336                    audit_events,
2337                    EventType::Alter,
2338                    object_type_to_audit_object_type(object_type),
2339                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2340                        object_id: id.to_string(),
2341                        old_owner_id: old_owner.to_string(),
2342                        new_owner_id: new_owner.to_string(),
2343                    }),
2344                )?;
2345            }
2346            Op::UpdateClusterConfig { id, name, config } => {
2347                let mut cluster = state.get_cluster(id).clone();
2348                cluster.config = config;
2349                tx.update_cluster(id, cluster.into())?;
2350                info!("update cluster {}", name);
2351
2352                CatalogState::add_to_audit_log(
2353                    &state.system_configuration,
2354                    oracle_write_ts,
2355                    session,
2356                    tx,
2357                    audit_events,
2358                    EventType::Alter,
2359                    ObjectType::Cluster,
2360                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2361                        id: id.to_string(),
2362                        name,
2363                    }),
2364                )?;
2365            }
2366            Op::UpdateClusterReplicaConfig {
2367                replica_id,
2368                cluster_id,
2369                config,
2370            } => {
2371                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2372                info!("update replica {}", replica.name);
2373                tx.update_cluster_replica(
2374                    replica_id,
2375                    mz_catalog::durable::ClusterReplica {
2376                        cluster_id,
2377                        replica_id,
2378                        name: replica.name.clone(),
2379                        config: config.clone().into(),
2380                        owner_id: replica.owner_id,
2381                    },
2382                )?;
2383            }
2384            Op::UpdateItem { id, name, to_item } => {
2385                let mut entry = state.get_entry(&id).clone();
2386                entry.name = name.clone();
2387                entry.item = to_item.clone();
2388                tx.update_item(id, entry.into())?;
2389
2390                if Self::should_audit_log_item(&to_item) {
2391                    let mut full_name = Self::full_name_detail(
2392                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2393                    );
2394                    full_name.item = name.item;
2395
2396                    CatalogState::add_to_audit_log(
2397                        &state.system_configuration,
2398                        oracle_write_ts,
2399                        session,
2400                        tx,
2401                        audit_events,
2402                        EventType::Alter,
2403                        catalog_type_to_audit_object_type(to_item.typ()),
2404                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2405                            id: id.to_string(),
2406                            name: full_name,
2407                        }),
2408                    )?;
2409                }
2410
2411                Self::log_update(state, &id);
2412            }
2413            Op::UpdateSystemConfiguration { name, value } => {
2414                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2415                tx.upsert_system_config(&name, parsed_value.clone())?;
2416                // This mirrors some "system vars" into the catalog storage
2417                // "config" collection so that we can toggle the flag with
2418                // Launch Darkly, but use it in boot before Launch Darkly is
2419                // available.
2420                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2421                    let with_0dt_deployment_max_wait =
2422                        Duration::parse(VarInput::Flat(&parsed_value))
2423                            .expect("parsing succeeded above");
2424                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2425                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2426                    let with_0dt_deployment_ddl_check_interval =
2427                        Duration::parse(VarInput::Flat(&parsed_value))
2428                            .expect("parsing succeeded above");
2429                    tx.set_0dt_deployment_ddl_check_interval(
2430                        with_0dt_deployment_ddl_check_interval,
2431                    )?;
2432                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2433                    let panic_after_timeout =
2434                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2435                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2436                }
2437
2438                CatalogState::add_to_audit_log(
2439                    &state.system_configuration,
2440                    oracle_write_ts,
2441                    session,
2442                    tx,
2443                    audit_events,
2444                    EventType::Alter,
2445                    ObjectType::System,
2446                    EventDetails::SetV1(mz_audit_log::SetV1 {
2447                        name,
2448                        value: Some(value.borrow().to_vec().join(", ")),
2449                    }),
2450                )?;
2451            }
2452            Op::ResetSystemConfiguration { name } => {
2453                tx.remove_system_config(&name);
2454                // This mirrors some "system vars" into the catalog storage
2455                // "config" collection so that we can toggle the flag with
2456                // Launch Darkly, but use it in boot before Launch Darkly is
2457                // available.
2458                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2459                    tx.reset_0dt_deployment_max_wait()?;
2460                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2461                    tx.reset_0dt_deployment_ddl_check_interval()?;
2462                }
2463
2464                CatalogState::add_to_audit_log(
2465                    &state.system_configuration,
2466                    oracle_write_ts,
2467                    session,
2468                    tx,
2469                    audit_events,
2470                    EventType::Alter,
2471                    ObjectType::System,
2472                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2473                )?;
2474            }
2475            Op::ResetAllSystemConfiguration => {
2476                tx.clear_system_configs();
2477                tx.reset_0dt_deployment_max_wait()?;
2478                tx.reset_0dt_deployment_ddl_check_interval()?;
2479
2480                CatalogState::add_to_audit_log(
2481                    &state.system_configuration,
2482                    oracle_write_ts,
2483                    session,
2484                    tx,
2485                    audit_events,
2486                    EventType::Alter,
2487                    ObjectType::System,
2488                    EventDetails::ResetAllV1,
2489                )?;
2490            }
2491            Op::WeirdStorageUsageUpdates {
2492                object_id,
2493                size_bytes,
2494                collection_timestamp,
2495            } => {
2496                let id = tx.allocate_storage_usage_ids()?;
2497                let metric =
2498                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2499                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2500                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2501                weird_builtin_table_update = Some(builtin_table_update);
2502            }
2503        };
2504        Ok((weird_builtin_table_update, temporary_item_updates))
2505    }
2506
2507    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2508        let entry = state.get_entry(id);
2509        info!(
2510            "update {} {} ({})",
2511            entry.item_type(),
2512            state.resolve_full_name(entry.name(), entry.conn_id()),
2513            id
2514        );
2515    }
2516
2517    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2518    /// implementation:
2519    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2520    fn update_privilege_owners(
2521        privileges: &mut PrivilegeMap,
2522        old_owner: RoleId,
2523        new_owner: RoleId,
2524    ) {
2525        // TODO(jkosh44) Would be nice not to clone every privilege.
2526        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2527
2528        let mut new_present = false;
2529        for privilege in flat_privileges.iter_mut() {
2530            // Old owner's granted privilege are updated to be granted by the new
2531            // owner.
2532            if privilege.grantor == old_owner {
2533                privilege.grantor = new_owner;
2534            } else if privilege.grantor == new_owner {
2535                new_present = true;
2536            }
2537            // Old owner's privileges is given to the new owner.
2538            if privilege.grantee == old_owner {
2539                privilege.grantee = new_owner;
2540            } else if privilege.grantee == new_owner {
2541                new_present = true;
2542            }
2543        }
2544
2545        // If the old privilege list contained references to the new owner, we may
2546        // have created duplicate entries. Here we try and consolidate them. This
2547        // is inspired by PostgreSQL's algorithm but not identical.
2548        if new_present {
2549            // Group privileges by (grantee, grantor).
2550            let privilege_map: BTreeMap<_, Vec<_>> =
2551                flat_privileges
2552                    .into_iter()
2553                    .fold(BTreeMap::new(), |mut accum, privilege| {
2554                        accum
2555                            .entry((privilege.grantee, privilege.grantor))
2556                            .or_default()
2557                            .push(privilege);
2558                        accum
2559                    });
2560
2561            // Consolidate and update all privileges.
2562            flat_privileges = privilege_map
2563                .into_iter()
2564                .map(|((grantee, grantor), values)|
2565                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2566                    values.into_iter().fold(
2567                        MzAclItem::empty(grantee, grantor),
2568                        |mut accum, mz_aclitem| {
2569                            accum.acl_mode =
2570                                accum.acl_mode.union(mz_aclitem.acl_mode);
2571                            accum
2572                        },
2573                    ))
2574                .collect();
2575        }
2576
2577        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2578    }
2579}
2580
2581/// Generate audit events for a replacement apply operation.
2582fn apply_replacement_audit_events(
2583    state: &CatalogState,
2584    target: &CatalogEntry,
2585    replacement: &CatalogEntry,
2586) -> Vec<(EventType, EventDetails)> {
2587    let mut events = Vec::new();
2588
2589    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2590    let target_id_name = IdFullNameV1 {
2591        id: target.id().to_string(),
2592        name: Catalog::full_name_detail(&target_name),
2593    };
2594    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2595    let replacement_id_name = IdFullNameV1 {
2596        id: replacement.id().to_string(),
2597        name: Catalog::full_name_detail(&replacement_name),
2598    };
2599
2600    if Catalog::should_audit_log_item(&target.item) {
2601        events.push((
2602            EventType::Alter,
2603            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2604                target: target_id_name.clone(),
2605                replacement: replacement_id_name.clone(),
2606            }),
2607        ));
2608
2609        if let Some(old_cluster_id) = target.cluster_id()
2610            && let Some(new_cluster_id) = replacement.cluster_id()
2611            && old_cluster_id != new_cluster_id
2612        {
2613            events.push((
2614                EventType::Alter,
2615                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2616                    id: target.id().to_string(),
2617                    name: target_id_name.name,
2618                    old_cluster_id: old_cluster_id.to_string(),
2619                    new_cluster_id: new_cluster_id.to_string(),
2620                }),
2621            ));
2622        }
2623    }
2624
2625    if Catalog::should_audit_log_item(&replacement.item) {
2626        events.push((
2627            EventType::Drop,
2628            EventDetails::IdFullNameV1(replacement_id_name),
2629        ));
2630    }
2631
2632    events
2633}
2634
2635/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2636///
2637/// Note: Previously we used to omit a single `Op::DropObject` for every object
2638/// we needed to drop. But removing a batch of objects from a durable Catalog
2639/// Transaction is O(n) where `n` is the number of objects that exist in the
2640/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2641/// `DROP ... CASCADE` statement.
2642#[derive(Debug, Default)]
2643pub(crate) struct ObjectsToDrop {
2644    pub comments: BTreeSet<CommentObjectId>,
2645    pub databases: BTreeSet<DatabaseId>,
2646    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2647    pub clusters: BTreeSet<ClusterId>,
2648    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2649    pub roles: BTreeSet<RoleId>,
2650    pub items: Vec<CatalogItemId>,
2651    pub network_policies: BTreeSet<NetworkPolicyId>,
2652}
2653
2654impl ObjectsToDrop {
2655    pub fn generate(
2656        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2657        state: &CatalogState,
2658        session: Option<&ConnMeta>,
2659    ) -> Result<Self, AdapterError> {
2660        let mut delta = ObjectsToDrop::default();
2661
2662        for drop_object_info in drop_object_infos {
2663            delta.add_item(drop_object_info, state, session)?;
2664        }
2665
2666        Ok(delta)
2667    }
2668
2669    fn add_item(
2670        &mut self,
2671        drop_object_info: DropObjectInfo,
2672        state: &CatalogState,
2673        session: Option<&ConnMeta>,
2674    ) -> Result<(), AdapterError> {
2675        self.comments
2676            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2677
2678        match drop_object_info {
2679            DropObjectInfo::Database(database_id) => {
2680                let database = &state.database_by_id[&database_id];
2681                if database_id.is_system() {
2682                    return Err(AdapterError::Catalog(Error::new(
2683                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2684                    )));
2685                }
2686
2687                self.databases.insert(database_id);
2688            }
2689            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2690                let schema = state.get_schema(
2691                    &database_spec,
2692                    &schema_spec,
2693                    session
2694                        .map(|session| session.conn_id())
2695                        .unwrap_or(&SYSTEM_CONN_ID),
2696                );
2697                let schema_id: SchemaId = schema_spec.into();
2698                if schema_id.is_system() {
2699                    let name = schema.name();
2700                    let full_name = state.resolve_full_schema_name(name);
2701                    return Err(AdapterError::Catalog(Error::new(
2702                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2703                    )));
2704                }
2705
2706                self.schemas.insert(schema_spec, database_spec);
2707            }
2708            DropObjectInfo::Role(role_id) => {
2709                let name = state.get_role(&role_id).name().to_string();
2710                if role_id.is_system() || role_id.is_predefined() {
2711                    return Err(AdapterError::Catalog(Error::new(
2712                        ErrorKind::ReservedRoleName(name.clone()),
2713                    )));
2714                }
2715                state.ensure_not_reserved_role(&role_id)?;
2716
2717                self.roles.insert(role_id);
2718            }
2719            DropObjectInfo::Cluster(cluster_id) => {
2720                let cluster = state.get_cluster(cluster_id);
2721                let name = &cluster.name;
2722                if cluster_id.is_system() {
2723                    return Err(AdapterError::Catalog(Error::new(
2724                        ErrorKind::ReadOnlyCluster(name.clone()),
2725                    )));
2726                }
2727
2728                self.clusters.insert(cluster_id);
2729            }
2730            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2731                let cluster = state.get_cluster(cluster_id);
2732                let replica = cluster.replica(replica_id).expect("Must exist");
2733
2734                self.replicas
2735                    .insert(replica.replica_id, (cluster.id, reason));
2736            }
2737            DropObjectInfo::Item(item_id) => {
2738                let entry = state.get_entry(&item_id);
2739                if item_id.is_system() {
2740                    let name = entry.name();
2741                    let full_name =
2742                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2743                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2744                        full_name.to_string(),
2745                    ))));
2746                }
2747
2748                self.items.push(item_id);
2749            }
2750            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2751                let policy = state.get_network_policy(&network_policy_id);
2752                let name = &policy.name;
2753                if network_policy_id.is_system() {
2754                    return Err(AdapterError::Catalog(Error::new(
2755                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2756                    )));
2757                }
2758
2759                self.network_policies.insert(network_policy_id);
2760            }
2761        }
2762
2763        Ok(())
2764    }
2765}
2766
2767#[cfg(test)]
2768mod tests {
2769    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2770    use mz_repr::role_id::RoleId;
2771
2772    use crate::catalog::Catalog;
2773
2774    #[mz_ore::test]
2775    fn test_update_privilege_owners() {
2776        let old_owner = RoleId::User(1);
2777        let new_owner = RoleId::User(2);
2778        let other_role = RoleId::User(3);
2779
2780        // older owner exists as grantor.
2781        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2782            MzAclItem {
2783                grantee: other_role,
2784                grantor: old_owner,
2785                acl_mode: AclMode::UPDATE,
2786            },
2787            MzAclItem {
2788                grantee: other_role,
2789                grantor: new_owner,
2790                acl_mode: AclMode::SELECT,
2791            },
2792        ]);
2793        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2794        assert_eq!(1, privileges.all_values().count());
2795        assert_eq!(
2796            vec![MzAclItem {
2797                grantee: other_role,
2798                grantor: new_owner,
2799                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2800            }],
2801            privileges.all_values_owned().collect::<Vec<_>>()
2802        );
2803
2804        // older owner exists as grantee.
2805        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2806            MzAclItem {
2807                grantee: old_owner,
2808                grantor: other_role,
2809                acl_mode: AclMode::UPDATE,
2810            },
2811            MzAclItem {
2812                grantee: new_owner,
2813                grantor: other_role,
2814                acl_mode: AclMode::SELECT,
2815            },
2816        ]);
2817        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2818        assert_eq!(1, privileges.all_values().count());
2819        assert_eq!(
2820            vec![MzAclItem {
2821                grantee: new_owner,
2822                grantor: other_role,
2823                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2824            }],
2825            privileges.all_values_owned().collect::<Vec<_>>()
2826        );
2827
2828        // older owner exists as grantee and grantor.
2829        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2830            MzAclItem {
2831                grantee: old_owner,
2832                grantor: old_owner,
2833                acl_mode: AclMode::UPDATE,
2834            },
2835            MzAclItem {
2836                grantee: new_owner,
2837                grantor: new_owner,
2838                acl_mode: AclMode::SELECT,
2839            },
2840        ]);
2841        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2842        assert_eq!(1, privileges.all_values().count());
2843        assert_eq!(
2844            vec![MzAclItem {
2845                grantee: new_owner,
2846                grantor: new_owner,
2847                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2848            }],
2849            privileges.all_values_owned().collect::<Vec<_>>()
2850        );
2851    }
2852}