Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52    RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72    is_reserved_role_name, object_type_to_audit_object_type,
73    system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82    AlterRetainHistory {
83        id: CatalogItemId,
84        value: Option<Value>,
85        window: CompactionWindow,
86    },
87    AlterSourceTimestampInterval {
88        id: CatalogItemId,
89        value: Option<Value>,
90        interval: Duration,
91    },
92    AlterRole {
93        id: RoleId,
94        name: String,
95        attributes: RoleAttributesRaw,
96        nopassword: bool,
97        vars: RoleVars,
98    },
99    AlterNetworkPolicy {
100        id: NetworkPolicyId,
101        rules: Vec<NetworkPolicyRule>,
102        name: String,
103        owner_id: RoleId,
104    },
105    AlterAddColumn {
106        id: CatalogItemId,
107        new_global_id: GlobalId,
108        name: ColumnName,
109        typ: SqlColumnType,
110        sql: RawDataType,
111    },
112    AlterMaterializedViewApplyReplacement {
113        id: CatalogItemId,
114        replacement_id: CatalogItemId,
115    },
116    CreateDatabase {
117        name: String,
118        owner_id: RoleId,
119    },
120    CreateSchema {
121        database_id: ResolvedDatabaseSpecifier,
122        schema_name: String,
123        owner_id: RoleId,
124    },
125    CreateRole {
126        name: String,
127        attributes: RoleAttributesRaw,
128    },
129    CreateCluster {
130        id: ClusterId,
131        name: String,
132        introspection_sources: Vec<&'static BuiltinLog>,
133        owner_id: RoleId,
134        config: ClusterConfig,
135    },
136    CreateClusterReplica {
137        cluster_id: ClusterId,
138        name: String,
139        config: ReplicaConfig,
140        owner_id: RoleId,
141        reason: ReplicaCreateDropReason,
142    },
143    CreateItem {
144        id: CatalogItemId,
145        name: QualifiedItemName,
146        item: CatalogItem,
147        owner_id: RoleId,
148    },
149    CreateNetworkPolicy {
150        rules: Vec<NetworkPolicyRule>,
151        name: String,
152        owner_id: RoleId,
153    },
154    Comment {
155        object_id: CommentObjectId,
156        sub_component: Option<usize>,
157        comment: Option<String>,
158    },
159    DropObjects(Vec<DropObjectInfo>),
160    GrantRole {
161        role_id: RoleId,
162        member_id: RoleId,
163        grantor_id: RoleId,
164    },
165    RenameCluster {
166        id: ClusterId,
167        name: String,
168        to_name: String,
169        check_reserved_names: bool,
170    },
171    RenameClusterReplica {
172        cluster_id: ClusterId,
173        replica_id: ReplicaId,
174        name: QualifiedReplica,
175        to_name: String,
176    },
177    RenameItem {
178        id: CatalogItemId,
179        current_full_name: FullItemName,
180        to_name: String,
181    },
182    RenameSchema {
183        database_spec: ResolvedDatabaseSpecifier,
184        schema_spec: SchemaSpecifier,
185        new_name: String,
186        check_reserved_names: bool,
187    },
188    UpdateOwner {
189        id: ObjectId,
190        new_owner: RoleId,
191    },
192    UpdatePrivilege {
193        target_id: SystemObjectId,
194        privilege: MzAclItem,
195        variant: UpdatePrivilegeVariant,
196    },
197    UpdateDefaultPrivilege {
198        privilege_object: DefaultPrivilegeObject,
199        privilege_acl_item: DefaultPrivilegeAclItem,
200        variant: UpdatePrivilegeVariant,
201    },
202    RevokeRole {
203        role_id: RoleId,
204        member_id: RoleId,
205        grantor_id: RoleId,
206    },
207    UpdateClusterConfig {
208        id: ClusterId,
209        name: String,
210        config: ClusterConfig,
211    },
212    UpdateClusterReplicaConfig {
213        cluster_id: ClusterId,
214        replica_id: ReplicaId,
215        config: ReplicaConfig,
216    },
217    UpdateItem {
218        id: CatalogItemId,
219        name: QualifiedItemName,
220        to_item: CatalogItem,
221    },
222    UpdateSourceReferences {
223        source_id: CatalogItemId,
224        references: SourceReferences,
225    },
226    UpdateSystemConfiguration {
227        name: String,
228        value: OwnedVarInput,
229    },
230    ResetSystemConfiguration {
231        name: String,
232    },
233    ResetAllSystemConfiguration,
234    /// Performs updates to the storage usage table, which probably should be a builtin source.
235    ///
236    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
237    /// might not work. If a process crashes after collecting storage usage events
238    /// but before updating the builtin table, then another listening catalog
239    /// will never know to update the builtin table.
240    WeirdStorageUsageUpdates {
241        object_id: Option<String>,
242        size_bytes: u64,
243        collection_timestamp: EpochMillis,
244    },
245    /// Performs a dry run of the commit, but errors with
246    /// [`AdapterError::TransactionDryRun`].
247    ///
248    /// When using this value, it should be included only as the last element of
249    /// the transaction and should not be the only value in the transaction.
250    TransactionDryRun,
251}
252
253/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
254/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
255/// the `Op::DropObjects`.
256#[derive(Debug, Clone)]
257pub enum DropObjectInfo {
258    Cluster(ClusterId),
259    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
260    Database(DatabaseId),
261    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
262    Role(RoleId),
263    Item(CatalogItemId),
264    NetworkPolicy(NetworkPolicyId),
265}
266
267impl DropObjectInfo {
268    /// Creates a `DropObjectInfo` from an `ObjectId`.
269    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
270    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
271        match id {
272            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
273            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
274                cluster_id,
275                replica_id,
276                ReplicaCreateDropReason::Manual,
277            )),
278            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
279            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
280            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
281            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
282            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
283        }
284    }
285
286    /// Creates an `ObjectId` from a `DropObjectInfo`.
287    /// Loses the `ReplicaCreateDropReason` if there is one!
288    fn to_object_id(&self) -> ObjectId {
289        match &self {
290            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
291            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
292                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
293            }
294            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
295            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
296            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
297            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
298            DropObjectInfo::NetworkPolicy(network_policy_id) => {
299                ObjectId::NetworkPolicy(network_policy_id.clone())
300            }
301        }
302    }
303}
304
305/// The reason for creating or dropping a replica.
306#[derive(Debug, Clone)]
307pub enum ReplicaCreateDropReason {
308    /// The user initiated the replica create or drop, e.g., by
309    /// - creating/dropping a cluster,
310    /// - ALTERing various options on a managed cluster,
311    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
312    Manual,
313    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
314    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
315    ClusterScheduling(Vec<SchedulingDecision>),
316}
317
318impl ReplicaCreateDropReason {
319    pub fn into_audit_log(
320        self,
321    ) -> (
322        CreateOrDropClusterReplicaReasonV1,
323        Option<SchedulingDecisionsWithReasonsV2>,
324    ) {
325        let (reason, scheduling_policies) = match self {
326            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
327            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
328                CreateOrDropClusterReplicaReasonV1::Schedule,
329                Some(scheduling_decisions),
330            ),
331        };
332        (
333            reason,
334            scheduling_policies
335                .as_ref()
336                .map(SchedulingDecision::reasons_to_audit_log_reasons),
337        )
338    }
339}
340
341pub struct TransactionResult {
342    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
343    /// Parsed catalog updates from which we will derive catalog implications.
344    pub catalog_updates: Vec<ParsedStateUpdate>,
345    pub audit_events: Vec<VersionedEvent>,
346}
347
348impl Catalog {
349    fn should_audit_log_item(item: &CatalogItem) -> bool {
350        !item.is_temporary()
351    }
352
353    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
354    /// within a connection id.
355    fn temporary_ids(
356        &self,
357        ops: &[Op],
358        temporary_drops: BTreeSet<(&ConnectionId, String)>,
359    ) -> Result<BTreeSet<CatalogItemId>, Error> {
360        let mut creating = BTreeSet::new();
361        let mut temporary_ids = BTreeSet::new();
362        for op in ops.iter() {
363            if let Op::CreateItem {
364                id,
365                name,
366                item,
367                owner_id: _,
368            } = op
369            {
370                if let Some(conn_id) = item.conn_id() {
371                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
372                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
373                        || creating.contains(&(conn_id, &name.item))
374                    {
375                        return Err(
376                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
377                        );
378                    } else {
379                        creating.insert((conn_id, &name.item));
380                        temporary_ids.insert(id.clone());
381                    }
382                }
383            }
384        }
385        Ok(temporary_ids)
386    }
387
388    #[instrument(name = "catalog::transact")]
389    pub async fn transact(
390        &mut self,
391        // n.b. this is an option to prevent us from needing to build out a
392        // dummy impl of `StorageController` for tests.
393        storage_collections: Option<
394            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
395        >,
396        oracle_write_ts: mz_repr::Timestamp,
397        session: Option<&ConnMeta>,
398        ops: Vec<Op>,
399    ) -> Result<TransactionResult, AdapterError> {
400        trace!("transact: {:?}", ops);
401        fail::fail_point!("catalog_transact", |arg| {
402            Err(AdapterError::Unstructured(anyhow::anyhow!(
403                "failpoint: {arg:?}"
404            )))
405        });
406
407        let drop_ids: BTreeSet<CatalogItemId> = ops
408            .iter()
409            .filter_map(|op| match op {
410                Op::DropObjects(drop_object_infos) => {
411                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
412                    let item_ids = ids.filter_map(|id| match id {
413                        ObjectId::Item(id) => Some(id),
414                        _ => None,
415                    });
416                    Some(item_ids)
417                }
418                _ => None,
419            })
420            .flatten()
421            .collect();
422        let temporary_drops = drop_ids
423            .iter()
424            .filter_map(|id| {
425                let entry = self.get_entry(id);
426                match entry.item().conn_id() {
427                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
428                    None => None,
429                }
430            })
431            .collect();
432        let dropped_global_ids = drop_ids
433            .iter()
434            .flat_map(|item_id| self.get_global_ids(item_id))
435            .collect();
436
437        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
438        let mut builtin_table_updates = vec![];
439        let mut catalog_updates = vec![];
440        let mut audit_events = vec![];
441        let mut storage = self.storage().await;
442        let mut tx = storage
443            .transaction()
444            .await
445            .unwrap_or_terminate("starting catalog transaction");
446
447        let new_state = Self::transact_inner(
448            storage_collections,
449            oracle_write_ts,
450            session,
451            ops,
452            temporary_ids,
453            &mut builtin_table_updates,
454            &mut catalog_updates,
455            &mut audit_events,
456            &mut tx,
457            &self.state,
458        )
459        .await?;
460
461        // The user closure was successful, apply the updates. Terminate the
462        // process if this fails, because we have to restart envd due to
463        // indeterminate catalog state, which we only reconcile during catalog
464        // init.
465        tx.commit(oracle_write_ts)
466            .await
467            .unwrap_or_terminate("catalog storage transaction commit must succeed");
468
469        // Dropping here keeps the mutable borrow on self, preventing us accidentally
470        // mutating anything until after f is executed.
471        drop(storage);
472        if let Some(new_state) = new_state {
473            self.transient_revision += 1;
474            self.state = new_state;
475        }
476
477        // Drop in-memory planning metadata.
478        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
479        if self.state.system_config().enable_mz_notices() {
480            // Generate retractions for the Builtin tables.
481            self.state().pack_optimizer_notices(
482                &mut builtin_table_updates,
483                dropped_notices.iter(),
484                Diff::MINUS_ONE,
485            );
486        }
487
488        Ok(TransactionResult {
489            builtin_table_updates,
490            catalog_updates,
491            audit_events,
492        })
493    }
494
495    /// Extracts optimized expressions from `Op::CreateItem` operations for views
496    /// and materialized views. These can be used to populate a `LocalExpressionCache`
497    /// to avoid re-optimization during `apply_updates`.
498    fn extract_expressions_from_ops(
499        ops: &[Op],
500        optimizer_features: &OptimizerFeatures,
501    ) -> BTreeMap<GlobalId, LocalExpressions> {
502        let mut exprs = BTreeMap::new();
503
504        for op in ops {
505            if let Op::CreateItem { item, .. } = op {
506                match item {
507                    CatalogItem::View(view) => {
508                        exprs.insert(
509                            view.global_id,
510                            LocalExpressions {
511                                local_mir: (*view.optimized_expr).clone(),
512                                optimizer_features: optimizer_features.clone(),
513                            },
514                        );
515                    }
516                    CatalogItem::MaterializedView(mv) => {
517                        exprs.insert(
518                            mv.global_id_writes(),
519                            LocalExpressions {
520                                local_mir: (*mv.optimized_expr).clone(),
521                                optimizer_features: optimizer_features.clone(),
522                            },
523                        );
524                    }
525                    _ => {}
526                }
527            }
528        }
529
530        exprs
531    }
532
533    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
534    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
535    ///
536    /// # Panics
537    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
538    ///   final element.
539    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
540    #[instrument(name = "catalog::transact_inner")]
541    async fn transact_inner(
542        storage_collections: Option<
543            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
544        >,
545        oracle_write_ts: mz_repr::Timestamp,
546        session: Option<&ConnMeta>,
547        mut ops: Vec<Op>,
548        temporary_ids: BTreeSet<CatalogItemId>,
549        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
550        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
551        audit_events: &mut Vec<VersionedEvent>,
552        tx: &mut Transaction<'_>,
553        state: &CatalogState,
554    ) -> Result<Option<CatalogState>, AdapterError> {
555        // We come up with new catalog state, builtin state updates, and parsed
556        // catalog updates (for deriving catalog implications) in two phases:
557        //
558        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
559        //    one-by-one. This will give us the full list of updates to apply to
560        //    the catalog, which will allow us to apply it in one batch, which
561        //    in turn will allow the apply machinery to consolidate the updates.
562        // 2. We do one final apply call with all updates, which gives us the
563        //    final builtin table updates and parsed catalog updates.
564        //
565        // The reason is that the loop that is working off ops first does a
566        // transact_op to derive the state updates for that op, and then calls
567        // apply_updates on the catalog state. And successive ops might expect
568        // the catalog state to reflect the modified state _after_ applying
569        // previous ops.
570        //
571        // We want to, however, have one final apply_state that takes all the
572        // accumulated updates to derive the required controller updates and the
573        // builtin table updates.
574        //
575        // We won't win any DDL throughput benchmarks, but so far that's not
576        // what we're optimizing for and there would probably be other
577        // bottlenecks before we hit this one as a bottleneck.
578        //
579        // We could work around this by refactoring how the interplay of
580        // transact_op and apply_updates works, but that's a larger undertaking.
581        let mut preliminary_state = Cow::Borrowed(state);
582
583        // The final state that we will return, if modified.
584        let mut state = Cow::Borrowed(state);
585
586        let dry_run_ops = match ops.last() {
587            Some(Op::TransactionDryRun) => {
588                // Remove dry run marker.
589                ops.pop();
590                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
591                ops.clone()
592            }
593            Some(_) => vec![],
594            None => return Ok(None),
595        };
596
597        // Extract optimized expressions from CreateItem ops to avoid re-optimization
598        // during apply_updates. We extract before the loop since `ops` is moved there.
599        let optimizer_features = OptimizerFeatures::from(state.system_config());
600        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
601
602        let mut storage_collections_to_create = BTreeSet::new();
603        let mut storage_collections_to_drop = BTreeSet::new();
604        let mut storage_collections_to_register = BTreeMap::new();
605
606        let mut updates = Vec::new();
607
608        for op in ops {
609            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
610                oracle_write_ts,
611                session,
612                op,
613                &temporary_ids,
614                audit_events,
615                tx,
616                &*preliminary_state,
617                &mut storage_collections_to_create,
618                &mut storage_collections_to_drop,
619                &mut storage_collections_to_register,
620            )
621            .await?;
622
623            // Certain builtin tables are not derived from the durable catalog state, so they need
624            // to be updated ad-hoc based on the current transaction. This is weird and will not
625            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
626            // this instance crashes after committing the durable catalog but before applying the
627            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
628            // world. Currently, this works fine and there are no correctness issues because
629            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
630            // state of all builtin tables to the correct values.
631            if let Some(builtin_table_update) = weird_builtin_table_update {
632                builtin_table_updates.push(builtin_table_update);
633            }
634
635            // Temporary items are not stored in the durable catalog, so they need to be handled
636            // separately for updating state and builtin tables.
637            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
638            // in a multi-subscriber catalog world.
639            let upper = tx.upper();
640            let temporary_item_updates =
641                temporary_item_updates
642                    .into_iter()
643                    .map(|(item, diff)| StateUpdate {
644                        kind: StateUpdateKind::TemporaryItem(item),
645                        ts: upper,
646                        diff,
647                    });
648
649            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
650            op_updates.extend(temporary_item_updates);
651            if !op_updates.is_empty() {
652                // Clone the cache so each apply_updates call has access to cached expressions.
653                // The cache uses `remove` semantics, so we need a fresh clone for each call.
654                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
655                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
656                    .to_mut()
657                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
658                    .await;
659            }
660            updates.append(&mut op_updates);
661        }
662
663        if !updates.is_empty() {
664            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
665            let (op_builtin_table_updates, op_catalog_updates) = state
666                .to_mut()
667                .apply_updates(updates.clone(), &mut local_expr_cache)
668                .await;
669            let op_builtin_table_updates = state
670                .to_mut()
671                .resolve_builtin_table_updates(op_builtin_table_updates);
672            builtin_table_updates.extend(op_builtin_table_updates);
673            parsed_catalog_updates.extend(op_catalog_updates);
674        }
675
676        if dry_run_ops.is_empty() {
677            // `storage_collections` should only be `None` for tests.
678            if let Some(c) = storage_collections {
679                c.prepare_state(
680                    tx,
681                    storage_collections_to_create,
682                    storage_collections_to_drop,
683                    storage_collections_to_register,
684                )
685                .await?;
686            }
687
688            let updates = tx.get_and_commit_op_updates();
689            if !updates.is_empty() {
690                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
691                let (op_builtin_table_updates, op_catalog_updates) = state
692                    .to_mut()
693                    .apply_updates(updates.clone(), &mut local_expr_cache)
694                    .await;
695                let op_builtin_table_updates = state
696                    .to_mut()
697                    .resolve_builtin_table_updates(op_builtin_table_updates);
698                builtin_table_updates.extend(op_builtin_table_updates);
699                parsed_catalog_updates.extend(op_catalog_updates);
700            }
701
702            match state {
703                Cow::Owned(state) => Ok(Some(state)),
704                Cow::Borrowed(_) => Ok(None),
705            }
706        } else {
707            Err(AdapterError::TransactionDryRun {
708                new_ops: dry_run_ops,
709                new_state: state.into_owned(),
710            })
711        }
712    }
713
714    /// Performs the transaction operation described by `op`. This function prepares the changes in
715    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
716    /// changes.
717    ///
718    /// Optionally returns a builtin table update for any builtin table updates than cannot be
719    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
720    /// scenarios and ideally in the future don't exist.
721    #[instrument]
722    async fn transact_op(
723        oracle_write_ts: mz_repr::Timestamp,
724        session: Option<&ConnMeta>,
725        op: Op,
726        temporary_ids: &BTreeSet<CatalogItemId>,
727        audit_events: &mut Vec<VersionedEvent>,
728        tx: &mut Transaction<'_>,
729        state: &CatalogState,
730        storage_collections_to_create: &mut BTreeSet<GlobalId>,
731        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
732        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
733    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
734        let mut weird_builtin_table_update = None;
735        let mut temporary_item_updates = Vec::new();
736
737        match op {
738            Op::TransactionDryRun => {
739                unreachable!("TransactionDryRun can only be used a final element of ops")
740            }
741            Op::AlterRetainHistory { id, value, window } => {
742                let entry = state.get_entry(&id);
743                if id.is_system() {
744                    let name = entry.name();
745                    let full_name =
746                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
747                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
748                        full_name.to_string(),
749                    ))));
750                }
751
752                let mut new_entry = entry.clone();
753                let previous = new_entry
754                    .item
755                    .update_retain_history(value.clone(), window)
756                    .map_err(|_| {
757                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
758                            "planner should have rejected invalid alter retain history item type"
759                                .to_string(),
760                        )))
761                    })?;
762
763                if Self::should_audit_log_item(new_entry.item()) {
764                    let details =
765                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
766                            id: id.to_string(),
767                            old_history: previous.map(|previous| previous.to_string()),
768                            new_history: value.map(|v| v.to_string()),
769                        });
770                    CatalogState::add_to_audit_log(
771                        &state.system_configuration,
772                        oracle_write_ts,
773                        session,
774                        tx,
775                        audit_events,
776                        EventType::Alter,
777                        catalog_type_to_audit_object_type(new_entry.item().typ()),
778                        details,
779                    )?;
780                }
781
782                tx.update_item(id, new_entry.into())?;
783
784                Self::log_update(state, &id);
785            }
786            Op::AlterSourceTimestampInterval {
787                id,
788                value,
789                interval,
790            } => {
791                let entry = state.get_entry(&id);
792                if id.is_system() {
793                    let name = entry.name();
794                    let full_name =
795                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
796                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
797                        full_name.to_string(),
798                    ))));
799                }
800
801                let mut new_entry = entry.clone();
802                new_entry
803                    .item
804                    .update_timestamp_interval(value, interval)
805                    .map_err(|_| {
806                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
807                            "planner should have rejected invalid alter timestamp interval item type"
808                                .to_string(),
809                        )))
810                    })?;
811
812                tx.update_item(id, new_entry.into())?;
813
814                Self::log_update(state, &id);
815            }
816            Op::AlterRole {
817                id,
818                name,
819                attributes,
820                nopassword,
821                vars,
822            } => {
823                state.ensure_not_reserved_role(&id)?;
824
825                let mut existing_role = state.get_role(&id).clone();
826                let password = attributes.password.clone();
827                let scram_iterations = attributes
828                    .scram_iterations
829                    .unwrap_or_else(|| state.system_config().scram_iterations());
830                existing_role.attributes = attributes.into();
831                existing_role.vars = vars;
832                let password_action = if nopassword {
833                    PasswordAction::Clear
834                } else if let Some(password) = password {
835                    PasswordAction::Set(PasswordConfig {
836                        password,
837                        scram_iterations,
838                    })
839                } else {
840                    PasswordAction::NoChange
841                };
842                tx.update_role(id, existing_role.into(), password_action)?;
843
844                CatalogState::add_to_audit_log(
845                    &state.system_configuration,
846                    oracle_write_ts,
847                    session,
848                    tx,
849                    audit_events,
850                    EventType::Alter,
851                    ObjectType::Role,
852                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
853                        id: id.to_string(),
854                        name: name.clone(),
855                    }),
856                )?;
857
858                info!("update role {name} ({id})");
859            }
860            Op::AlterNetworkPolicy {
861                id,
862                rules,
863                name,
864                owner_id: _owner_id,
865            } => {
866                let existing_policy = state.get_network_policy(&id).clone();
867                let mut policy: NetworkPolicy = existing_policy.into();
868                policy.rules = rules;
869                if is_reserved_name(&name) {
870                    return Err(AdapterError::Catalog(Error::new(
871                        ErrorKind::ReservedNetworkPolicyName(name),
872                    )));
873                }
874                tx.update_network_policy(id, policy.clone())?;
875
876                CatalogState::add_to_audit_log(
877                    &state.system_configuration,
878                    oracle_write_ts,
879                    session,
880                    tx,
881                    audit_events,
882                    EventType::Alter,
883                    ObjectType::NetworkPolicy,
884                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
885                        id: id.to_string(),
886                        name: name.clone(),
887                    }),
888                )?;
889
890                info!("update network policy {name} ({id})");
891            }
892            Op::AlterAddColumn {
893                id,
894                new_global_id,
895                name,
896                typ,
897                sql,
898            } => {
899                let mut new_entry = state.get_entry(&id).clone();
900                let version = new_entry.item.add_column(name, typ, sql)?;
901                // All versions of a table share the same shard, so it shouldn't matter what
902                // GlobalId we use here.
903                let shard_id = state
904                    .storage_metadata()
905                    .get_collection_shard(new_entry.latest_global_id())?;
906
907                // TODO(alter_table): Support adding columns to sources.
908                let CatalogItem::Table(table) = &mut new_entry.item else {
909                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
910                };
911                table.collections.insert(version, new_global_id);
912
913                tx.update_item(id, new_entry.into())?;
914                storage_collections_to_register.insert(new_global_id, shard_id);
915            }
916            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
917                let mut new_entry = state.get_entry(&id).clone();
918                let replacement = state.get_entry(&replacement_id);
919
920                let new_audit_events =
921                    apply_replacement_audit_events(state, &new_entry, replacement);
922
923                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
924                    return Err(AdapterError::internal(
925                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
926                        "id must refer to a materialized view",
927                    ));
928                };
929                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
930                    return Err(AdapterError::internal(
931                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
932                        "replacement_id must refer to a materialized view",
933                    ));
934                };
935
936                mv.apply_replacement(replacement_mv.clone());
937
938                tx.remove_item(replacement_id)?;
939
940                new_entry.id = replacement_id;
941                tx_replace_item(tx, state, id, new_entry)?;
942
943                let comment_id = CommentObjectId::MaterializedView(replacement_id);
944                tx.drop_comments(&[comment_id].into())?;
945
946                for (event_type, details) in new_audit_events {
947                    CatalogState::add_to_audit_log(
948                        &state.system_configuration,
949                        oracle_write_ts,
950                        session,
951                        tx,
952                        audit_events,
953                        event_type,
954                        ObjectType::MaterializedView,
955                        details,
956                    )?;
957                }
958            }
959            Op::CreateDatabase { name, owner_id } => {
960                let database_owner_privileges = vec![rbac::owner_privilege(
961                    mz_sql::catalog::ObjectType::Database,
962                    owner_id,
963                )];
964                let database_default_privileges = state
965                    .default_privileges
966                    .get_applicable_privileges(
967                        owner_id,
968                        None,
969                        None,
970                        mz_sql::catalog::ObjectType::Database,
971                    )
972                    .map(|item| item.mz_acl_item(owner_id));
973                let database_privileges: Vec<_> = merge_mz_acl_items(
974                    database_owner_privileges
975                        .into_iter()
976                        .chain(database_default_privileges),
977                )
978                .collect();
979
980                let schema_owner_privileges = vec![rbac::owner_privilege(
981                    mz_sql::catalog::ObjectType::Schema,
982                    owner_id,
983                )];
984                let schema_default_privileges = state
985                    .default_privileges
986                    .get_applicable_privileges(
987                        owner_id,
988                        None,
989                        None,
990                        mz_sql::catalog::ObjectType::Schema,
991                    )
992                    .map(|item| item.mz_acl_item(owner_id))
993                    // Special default privilege on public schemas.
994                    .chain(std::iter::once(MzAclItem {
995                        grantee: RoleId::Public,
996                        grantor: owner_id,
997                        acl_mode: AclMode::USAGE,
998                    }));
999                let schema_privileges: Vec<_> = merge_mz_acl_items(
1000                    schema_owner_privileges
1001                        .into_iter()
1002                        .chain(schema_default_privileges),
1003                )
1004                .collect();
1005
1006                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1007                let (database_id, _) = tx.insert_user_database(
1008                    &name,
1009                    owner_id,
1010                    database_privileges.clone(),
1011                    &temporary_oids,
1012                )?;
1013                let (schema_id, _) = tx.insert_user_schema(
1014                    database_id,
1015                    DEFAULT_SCHEMA,
1016                    owner_id,
1017                    schema_privileges.clone(),
1018                    &temporary_oids,
1019                )?;
1020                CatalogState::add_to_audit_log(
1021                    &state.system_configuration,
1022                    oracle_write_ts,
1023                    session,
1024                    tx,
1025                    audit_events,
1026                    EventType::Create,
1027                    ObjectType::Database,
1028                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1029                        id: database_id.to_string(),
1030                        name: name.clone(),
1031                    }),
1032                )?;
1033                info!("create database {}", name);
1034
1035                CatalogState::add_to_audit_log(
1036                    &state.system_configuration,
1037                    oracle_write_ts,
1038                    session,
1039                    tx,
1040                    audit_events,
1041                    EventType::Create,
1042                    ObjectType::Schema,
1043                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1044                        id: schema_id.to_string(),
1045                        name: DEFAULT_SCHEMA.to_string(),
1046                        database_name: Some(name),
1047                    }),
1048                )?;
1049            }
1050            Op::CreateSchema {
1051                database_id,
1052                schema_name,
1053                owner_id,
1054            } => {
1055                if is_reserved_name(&schema_name) {
1056                    return Err(AdapterError::Catalog(Error::new(
1057                        ErrorKind::ReservedSchemaName(schema_name),
1058                    )));
1059                }
1060                let database_id = match database_id {
1061                    ResolvedDatabaseSpecifier::Id(id) => id,
1062                    ResolvedDatabaseSpecifier::Ambient => {
1063                        return Err(AdapterError::Catalog(Error::new(
1064                            ErrorKind::ReadOnlySystemSchema(schema_name),
1065                        )));
1066                    }
1067                };
1068                let owner_privileges = vec![rbac::owner_privilege(
1069                    mz_sql::catalog::ObjectType::Schema,
1070                    owner_id,
1071                )];
1072                let default_privileges = state
1073                    .default_privileges
1074                    .get_applicable_privileges(
1075                        owner_id,
1076                        Some(database_id),
1077                        None,
1078                        mz_sql::catalog::ObjectType::Schema,
1079                    )
1080                    .map(|item| item.mz_acl_item(owner_id));
1081                let privileges: Vec<_> =
1082                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1083                        .collect();
1084                let (schema_id, _) = tx.insert_user_schema(
1085                    database_id,
1086                    &schema_name,
1087                    owner_id,
1088                    privileges.clone(),
1089                    &state.get_temporary_oids().collect(),
1090                )?;
1091                CatalogState::add_to_audit_log(
1092                    &state.system_configuration,
1093                    oracle_write_ts,
1094                    session,
1095                    tx,
1096                    audit_events,
1097                    EventType::Create,
1098                    ObjectType::Schema,
1099                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1100                        id: schema_id.to_string(),
1101                        name: schema_name.clone(),
1102                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1103                    }),
1104                )?;
1105            }
1106            Op::CreateRole { name, attributes } => {
1107                if is_reserved_role_name(&name) {
1108                    return Err(AdapterError::Catalog(Error::new(
1109                        ErrorKind::ReservedRoleName(name),
1110                    )));
1111                }
1112                let membership = RoleMembership::new();
1113                let vars = RoleVars::default();
1114                let (id, _) = tx.insert_user_role(
1115                    name.clone(),
1116                    attributes.clone(),
1117                    membership.clone(),
1118                    vars.clone(),
1119                    &state.get_temporary_oids().collect(),
1120                )?;
1121                CatalogState::add_to_audit_log(
1122                    &state.system_configuration,
1123                    oracle_write_ts,
1124                    session,
1125                    tx,
1126                    audit_events,
1127                    EventType::Create,
1128                    ObjectType::Role,
1129                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1130                        id: id.to_string(),
1131                        name: name.clone(),
1132                    }),
1133                )?;
1134                info!("create role {}", name);
1135            }
1136            Op::CreateCluster {
1137                id,
1138                name,
1139                introspection_sources,
1140                owner_id,
1141                config,
1142            } => {
1143                if is_reserved_name(&name) {
1144                    return Err(AdapterError::Catalog(Error::new(
1145                        ErrorKind::ReservedClusterName(name),
1146                    )));
1147                }
1148                let owner_privileges = vec![rbac::owner_privilege(
1149                    mz_sql::catalog::ObjectType::Cluster,
1150                    owner_id,
1151                )];
1152                let default_privileges = state
1153                    .default_privileges
1154                    .get_applicable_privileges(
1155                        owner_id,
1156                        None,
1157                        None,
1158                        mz_sql::catalog::ObjectType::Cluster,
1159                    )
1160                    .map(|item| item.mz_acl_item(owner_id));
1161                let privileges: Vec<_> =
1162                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1163                        .collect();
1164                let introspection_source_ids: Vec<_> = introspection_sources
1165                    .iter()
1166                    .map(|introspection_source| {
1167                        Transaction::allocate_introspection_source_index_id(
1168                            &id,
1169                            introspection_source.variant,
1170                        )
1171                    })
1172                    .collect();
1173
1174                let introspection_sources = introspection_sources
1175                    .into_iter()
1176                    .zip_eq(introspection_source_ids)
1177                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1178                    .collect();
1179
1180                tx.insert_user_cluster(
1181                    id,
1182                    &name,
1183                    introspection_sources,
1184                    owner_id,
1185                    privileges.clone(),
1186                    config.clone().into(),
1187                    &state.get_temporary_oids().collect(),
1188                )?;
1189                CatalogState::add_to_audit_log(
1190                    &state.system_configuration,
1191                    oracle_write_ts,
1192                    session,
1193                    tx,
1194                    audit_events,
1195                    EventType::Create,
1196                    ObjectType::Cluster,
1197                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1198                        id: id.to_string(),
1199                        name: name.clone(),
1200                    }),
1201                )?;
1202                info!("create cluster {}", name);
1203            }
1204            Op::CreateClusterReplica {
1205                cluster_id,
1206                name,
1207                config,
1208                owner_id,
1209                reason,
1210            } => {
1211                if is_reserved_name(&name) {
1212                    return Err(AdapterError::Catalog(Error::new(
1213                        ErrorKind::ReservedReplicaName(name),
1214                    )));
1215                }
1216                let cluster = state.get_cluster(cluster_id);
1217                let id =
1218                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1219                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1220                    size,
1221                    billed_as,
1222                    internal,
1223                    ..
1224                }) = &config.location
1225                {
1226                    let (reason, scheduling_policies) = reason.into_audit_log();
1227                    let details = EventDetails::CreateClusterReplicaV4(
1228                        mz_audit_log::CreateClusterReplicaV4 {
1229                            cluster_id: cluster_id.to_string(),
1230                            cluster_name: cluster.name.clone(),
1231                            replica_id: Some(id.to_string()),
1232                            replica_name: name.clone(),
1233                            logical_size: size.clone(),
1234                            billed_as: billed_as.clone(),
1235                            internal: *internal,
1236                            reason,
1237                            scheduling_policies,
1238                        },
1239                    );
1240                    CatalogState::add_to_audit_log(
1241                        &state.system_configuration,
1242                        oracle_write_ts,
1243                        session,
1244                        tx,
1245                        audit_events,
1246                        EventType::Create,
1247                        ObjectType::ClusterReplica,
1248                        details,
1249                    )?;
1250                }
1251            }
1252            Op::CreateItem {
1253                id,
1254                name,
1255                item,
1256                owner_id,
1257            } => {
1258                state.check_unstable_dependencies(&item)?;
1259
1260                match &item {
1261                    CatalogItem::Table(table) => {
1262                        let gids: Vec<_> = table.global_ids().collect();
1263                        assert_eq!(gids.len(), 1);
1264                        storage_collections_to_create.extend(gids);
1265                    }
1266                    CatalogItem::Source(source) => {
1267                        storage_collections_to_create.insert(source.global_id());
1268                    }
1269                    CatalogItem::MaterializedView(mv) => {
1270                        let mv_gid = mv.global_id_writes();
1271                        if let Some(target_id) = mv.replacement_target {
1272                            let target_gid = state.get_entry(&target_id).latest_global_id();
1273                            let shard_id =
1274                                state.storage_metadata().get_collection_shard(target_gid)?;
1275                            storage_collections_to_register.insert(mv_gid, shard_id);
1276                        } else {
1277                            storage_collections_to_create.insert(mv_gid);
1278                        }
1279                    }
1280                    CatalogItem::ContinualTask(ct) => {
1281                        storage_collections_to_create.insert(ct.global_id());
1282                    }
1283                    CatalogItem::Sink(sink) => {
1284                        storage_collections_to_create.insert(sink.global_id());
1285                    }
1286                    CatalogItem::Log(_)
1287                    | CatalogItem::View(_)
1288                    | CatalogItem::Index(_)
1289                    | CatalogItem::Type(_)
1290                    | CatalogItem::Func(_)
1291                    | CatalogItem::Secret(_)
1292                    | CatalogItem::Connection(_) => (),
1293                }
1294
1295                let system_user = session.map_or(false, |s| s.user().is_system_user());
1296                if !system_user {
1297                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1298                        let cluster_name = state.clusters_by_id[&id].name.clone();
1299                        return Err(AdapterError::Catalog(Error::new(
1300                            ErrorKind::ReadOnlyCluster(cluster_name),
1301                        )));
1302                    }
1303                }
1304
1305                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1306                let default_privileges = state
1307                    .default_privileges
1308                    .get_applicable_privileges(
1309                        owner_id,
1310                        name.qualifiers.database_spec.id(),
1311                        Some(name.qualifiers.schema_spec.into()),
1312                        item.typ().into(),
1313                    )
1314                    .map(|item| item.mz_acl_item(owner_id));
1315                // mz_support can read all progress sources.
1316                let progress_source_privilege = if item.is_progress_source() {
1317                    Some(MzAclItem {
1318                        grantee: MZ_SUPPORT_ROLE_ID,
1319                        grantor: owner_id,
1320                        acl_mode: AclMode::SELECT,
1321                    })
1322                } else {
1323                    None
1324                };
1325                let privileges: Vec<_> = merge_mz_acl_items(
1326                    owner_privileges
1327                        .into_iter()
1328                        .chain(default_privileges)
1329                        .chain(progress_source_privilege),
1330                )
1331                .collect();
1332
1333                let temporary_oids = state.get_temporary_oids().collect();
1334
1335                if item.is_temporary() {
1336                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1337                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1338                    {
1339                        return Err(AdapterError::Catalog(Error::new(
1340                            ErrorKind::InvalidTemporarySchema,
1341                        )));
1342                    }
1343                    let oid = tx.allocate_oid(&temporary_oids)?;
1344
1345                    let schema_id = name.qualifiers.schema_spec.clone().into();
1346                    let item_type = item.typ();
1347                    let (create_sql, global_id, versions) = item.to_serialized();
1348
1349                    let item = TemporaryItem {
1350                        id,
1351                        oid,
1352                        global_id,
1353                        schema_id,
1354                        name: name.item.clone(),
1355                        create_sql,
1356                        conn_id: item.conn_id().cloned(),
1357                        owner_id,
1358                        privileges: privileges.clone(),
1359                        extra_versions: versions,
1360                    };
1361                    temporary_item_updates.push((item, StateDiff::Addition));
1362
1363                    info!(
1364                        "create temporary {} {} ({})",
1365                        item_type,
1366                        state.resolve_full_name(&name, None),
1367                        id
1368                    );
1369                } else {
1370                    if let Some(temp_id) =
1371                        item.uses()
1372                            .iter()
1373                            .find(|id| match state.try_get_entry(*id) {
1374                                Some(entry) => entry.item().is_temporary(),
1375                                None => temporary_ids.contains(id),
1376                            })
1377                    {
1378                        let temp_item = state.get_entry(temp_id);
1379                        return Err(AdapterError::Catalog(Error::new(
1380                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1381                        )));
1382                    }
1383                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1384                        && !system_user
1385                    {
1386                        let schema_name = state
1387                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1388                            .schema;
1389                        return Err(AdapterError::Catalog(Error::new(
1390                            ErrorKind::ReadOnlySystemSchema(schema_name),
1391                        )));
1392                    }
1393                    let schema_id = name.qualifiers.schema_spec.clone().into();
1394                    let item_type = item.typ();
1395                    let (create_sql, global_id, versions) = item.to_serialized();
1396                    tx.insert_user_item(
1397                        id,
1398                        global_id,
1399                        schema_id,
1400                        &name.item,
1401                        create_sql,
1402                        owner_id,
1403                        privileges.clone(),
1404                        &temporary_oids,
1405                        versions,
1406                    )?;
1407                    info!(
1408                        "create {} {} ({})",
1409                        item_type,
1410                        state.resolve_full_name(&name, None),
1411                        id
1412                    );
1413                }
1414
1415                if Self::should_audit_log_item(&item) {
1416                    let name = Self::full_name_detail(
1417                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1418                    );
1419                    let details = match &item {
1420                        CatalogItem::Source(s) => {
1421                            let cluster_id = match s.data_source {
1422                                // Ingestion exports don't have their own cluster, but
1423                                // run on their ingestion's cluster.
1424                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1425                                    match state.get_entry(&ingestion_id).cluster_id() {
1426                                        Some(cluster_id) => Some(cluster_id.to_string()),
1427                                        None => None,
1428                                    }
1429                                }
1430                                _ => match item.cluster_id() {
1431                                    Some(cluster_id) => Some(cluster_id.to_string()),
1432                                    None => None,
1433                                },
1434                            };
1435
1436                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1437                                id: id.to_string(),
1438                                cluster_id,
1439                                name,
1440                                external_type: s.source_type().to_string(),
1441                            })
1442                        }
1443                        CatalogItem::Sink(s) => {
1444                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1445                                id: id.to_string(),
1446                                cluster_id: Some(s.cluster_id.to_string()),
1447                                name,
1448                                external_type: s.sink_type().to_string(),
1449                            })
1450                        }
1451                        CatalogItem::Index(i) => {
1452                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1453                                id: id.to_string(),
1454                                name,
1455                                cluster_id: i.cluster_id.to_string(),
1456                            })
1457                        }
1458                        CatalogItem::MaterializedView(mv) => {
1459                            EventDetails::CreateMaterializedViewV1(
1460                                mz_audit_log::CreateMaterializedViewV1 {
1461                                    id: id.to_string(),
1462                                    name,
1463                                    cluster_id: mv.cluster_id.to_string(),
1464                                    replacement_target_id: mv
1465                                        .replacement_target
1466                                        .map(|id| id.to_string()),
1467                                },
1468                            )
1469                        }
1470                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1471                            id: id.to_string(),
1472                            name,
1473                        }),
1474                    };
1475                    CatalogState::add_to_audit_log(
1476                        &state.system_configuration,
1477                        oracle_write_ts,
1478                        session,
1479                        tx,
1480                        audit_events,
1481                        EventType::Create,
1482                        catalog_type_to_audit_object_type(item.typ()),
1483                        details,
1484                    )?;
1485                }
1486            }
1487            Op::CreateNetworkPolicy {
1488                rules,
1489                name,
1490                owner_id,
1491            } => {
1492                if state.network_policies_by_name.contains_key(&name) {
1493                    return Err(AdapterError::PlanError(PlanError::Catalog(
1494                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1495                    )));
1496                }
1497                if is_reserved_name(&name) {
1498                    return Err(AdapterError::Catalog(Error::new(
1499                        ErrorKind::ReservedNetworkPolicyName(name),
1500                    )));
1501                }
1502
1503                let owner_privileges = vec![rbac::owner_privilege(
1504                    mz_sql::catalog::ObjectType::NetworkPolicy,
1505                    owner_id,
1506                )];
1507                let default_privileges = state
1508                    .default_privileges
1509                    .get_applicable_privileges(
1510                        owner_id,
1511                        None,
1512                        None,
1513                        mz_sql::catalog::ObjectType::NetworkPolicy,
1514                    )
1515                    .map(|item| item.mz_acl_item(owner_id));
1516                let privileges: Vec<_> =
1517                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1518                        .collect();
1519
1520                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1521                let id = tx.insert_user_network_policy(
1522                    name.clone(),
1523                    rules,
1524                    privileges,
1525                    owner_id,
1526                    &temporary_oids,
1527                )?;
1528
1529                CatalogState::add_to_audit_log(
1530                    &state.system_configuration,
1531                    oracle_write_ts,
1532                    session,
1533                    tx,
1534                    audit_events,
1535                    EventType::Create,
1536                    ObjectType::NetworkPolicy,
1537                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1538                        id: id.to_string(),
1539                        name: name.clone(),
1540                    }),
1541                )?;
1542
1543                info!("created network policy {name} ({id})");
1544            }
1545            Op::Comment {
1546                object_id,
1547                sub_component,
1548                comment,
1549            } => {
1550                tx.update_comment(object_id, sub_component, comment)?;
1551                let entry = state.get_comment_id_entry(&object_id);
1552                let should_log = entry
1553                    .map(|entry| Self::should_audit_log_item(entry.item()))
1554                    // Things that aren't catalog entries can't be temp, so should be logged.
1555                    .unwrap_or(true);
1556                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1557                // comments won't be logged for now.
1558                if let (Some(conn_id), true) =
1559                    (session.map(|session| session.conn_id()), should_log)
1560                {
1561                    CatalogState::add_to_audit_log(
1562                        &state.system_configuration,
1563                        oracle_write_ts,
1564                        session,
1565                        tx,
1566                        audit_events,
1567                        EventType::Comment,
1568                        comment_id_to_audit_object_type(object_id),
1569                        EventDetails::IdNameV1(IdNameV1 {
1570                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1571                            id: format!("{object_id:?}"),
1572                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1573                        }),
1574                    )?;
1575                }
1576            }
1577            Op::UpdateSourceReferences {
1578                source_id,
1579                references,
1580            } => {
1581                tx.update_source_references(
1582                    source_id,
1583                    references
1584                        .references
1585                        .into_iter()
1586                        .map(|reference| reference.into())
1587                        .collect(),
1588                    references.updated_at,
1589                )?;
1590            }
1591            Op::DropObjects(drop_object_infos) => {
1592                // Generate all of the objects that need to get dropped.
1593                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1594
1595                // Drop any associated comments.
1596                tx.drop_comments(&delta.comments)?;
1597
1598                // Drop any items.
1599                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1600                    delta
1601                        .items
1602                        .iter()
1603                        .map(|id| id)
1604                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1605                tx.remove_items(&durable_items_to_drop)?;
1606                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1607                    let entry = state.get_entry(&id);
1608                    (entry.clone().into(), StateDiff::Retraction)
1609                }));
1610
1611                for item_id in delta.items {
1612                    let entry = state.get_entry(&item_id);
1613
1614                    // Drop associated storage collections, unless the dropped item is a
1615                    // replacement, in which case the replacement target owns the storage
1616                    // collection.
1617                    if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1618                    {
1619                        storage_collections_to_drop.extend(entry.global_ids());
1620                    }
1621
1622                    if state.source_references.contains_key(&item_id) {
1623                        tx.remove_source_references(item_id)?;
1624                    }
1625
1626                    if Self::should_audit_log_item(entry.item()) {
1627                        CatalogState::add_to_audit_log(
1628                            &state.system_configuration,
1629                            oracle_write_ts,
1630                            session,
1631                            tx,
1632                            audit_events,
1633                            EventType::Drop,
1634                            catalog_type_to_audit_object_type(entry.item().typ()),
1635                            EventDetails::IdFullNameV1(IdFullNameV1 {
1636                                id: item_id.to_string(),
1637                                name: Self::full_name_detail(&state.resolve_full_name(
1638                                    entry.name(),
1639                                    session.map(|session| session.conn_id()),
1640                                )),
1641                            }),
1642                        )?;
1643                    }
1644                    info!(
1645                        "drop {} {} ({})",
1646                        entry.item_type(),
1647                        state.resolve_full_name(entry.name(), entry.conn_id()),
1648                        item_id
1649                    );
1650                }
1651
1652                // Drop any schemas.
1653                let schemas = delta
1654                    .schemas
1655                    .iter()
1656                    .map(|(schema_spec, database_spec)| {
1657                        (SchemaId::from(schema_spec), *database_spec)
1658                    })
1659                    .collect();
1660                tx.remove_schemas(&schemas)?;
1661
1662                for (schema_spec, database_spec) in delta.schemas {
1663                    let schema = state.get_schema(
1664                        &database_spec,
1665                        &schema_spec,
1666                        session
1667                            .map(|session| session.conn_id())
1668                            .unwrap_or(&SYSTEM_CONN_ID),
1669                    );
1670
1671                    let schema_id = SchemaId::from(schema_spec);
1672                    let database_id = match database_spec {
1673                        ResolvedDatabaseSpecifier::Ambient => None,
1674                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1675                    };
1676
1677                    CatalogState::add_to_audit_log(
1678                        &state.system_configuration,
1679                        oracle_write_ts,
1680                        session,
1681                        tx,
1682                        audit_events,
1683                        EventType::Drop,
1684                        ObjectType::Schema,
1685                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1686                            id: schema_id.to_string(),
1687                            name: schema.name.schema.to_string(),
1688                            database_name: database_id
1689                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1690                        }),
1691                    )?;
1692                }
1693
1694                // Drop any databases.
1695                tx.remove_databases(&delta.databases)?;
1696
1697                for database_id in delta.databases {
1698                    let database = state.get_database(&database_id).clone();
1699
1700                    CatalogState::add_to_audit_log(
1701                        &state.system_configuration,
1702                        oracle_write_ts,
1703                        session,
1704                        tx,
1705                        audit_events,
1706                        EventType::Drop,
1707                        ObjectType::Database,
1708                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1709                            id: database_id.to_string(),
1710                            name: database.name.clone(),
1711                        }),
1712                    )?;
1713                }
1714
1715                // Drop any roles.
1716                tx.remove_user_roles(&delta.roles)?;
1717
1718                for role_id in delta.roles {
1719                    let role = state
1720                        .roles_by_id
1721                        .get(&role_id)
1722                        .expect("catalog out of sync");
1723
1724                    CatalogState::add_to_audit_log(
1725                        &state.system_configuration,
1726                        oracle_write_ts,
1727                        session,
1728                        tx,
1729                        audit_events,
1730                        EventType::Drop,
1731                        ObjectType::Role,
1732                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1733                            id: role.id.to_string(),
1734                            name: role.name.clone(),
1735                        }),
1736                    )?;
1737                    info!("drop role {}", role.name());
1738                }
1739
1740                // Drop any network policies.
1741                tx.remove_network_policies(&delta.network_policies)?;
1742
1743                for network_policy_id in delta.network_policies {
1744                    let policy = state
1745                        .network_policies_by_id
1746                        .get(&network_policy_id)
1747                        .expect("catalog out of sync");
1748
1749                    CatalogState::add_to_audit_log(
1750                        &state.system_configuration,
1751                        oracle_write_ts,
1752                        session,
1753                        tx,
1754                        audit_events,
1755                        EventType::Drop,
1756                        ObjectType::NetworkPolicy,
1757                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1758                            id: policy.id.to_string(),
1759                            name: policy.name.clone(),
1760                        }),
1761                    )?;
1762                    info!("drop network policy {}", policy.name.clone());
1763                }
1764
1765                // Drop any replicas.
1766                let replicas = delta.replicas.keys().copied().collect();
1767                tx.remove_cluster_replicas(&replicas)?;
1768
1769                for (replica_id, (cluster_id, reason)) in delta.replicas {
1770                    let cluster = state.get_cluster(cluster_id);
1771                    let replica = cluster.replica(replica_id).expect("Must exist");
1772
1773                    let (reason, scheduling_policies) = reason.into_audit_log();
1774                    let details =
1775                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1776                            cluster_id: cluster_id.to_string(),
1777                            cluster_name: cluster.name.clone(),
1778                            replica_id: Some(replica_id.to_string()),
1779                            replica_name: replica.name.clone(),
1780                            reason,
1781                            scheduling_policies,
1782                        });
1783                    CatalogState::add_to_audit_log(
1784                        &state.system_configuration,
1785                        oracle_write_ts,
1786                        session,
1787                        tx,
1788                        audit_events,
1789                        EventType::Drop,
1790                        ObjectType::ClusterReplica,
1791                        details,
1792                    )?;
1793                }
1794
1795                // Drop any clusters.
1796                tx.remove_clusters(&delta.clusters)?;
1797
1798                for cluster_id in delta.clusters {
1799                    let cluster = state.get_cluster(cluster_id);
1800
1801                    CatalogState::add_to_audit_log(
1802                        &state.system_configuration,
1803                        oracle_write_ts,
1804                        session,
1805                        tx,
1806                        audit_events,
1807                        EventType::Drop,
1808                        ObjectType::Cluster,
1809                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1810                            id: cluster.id.to_string(),
1811                            name: cluster.name.clone(),
1812                        }),
1813                    )?;
1814                }
1815            }
1816            Op::GrantRole {
1817                role_id,
1818                member_id,
1819                grantor_id,
1820            } => {
1821                state.ensure_not_reserved_role(&member_id)?;
1822                state.ensure_grantable_role(&role_id)?;
1823                if state.collect_role_membership(&role_id).contains(&member_id) {
1824                    let group_role = state.get_role(&role_id);
1825                    let member_role = state.get_role(&member_id);
1826                    return Err(AdapterError::Catalog(Error::new(
1827                        ErrorKind::CircularRoleMembership {
1828                            role_name: group_role.name().to_string(),
1829                            member_name: member_role.name().to_string(),
1830                        },
1831                    )));
1832                }
1833                let mut member_role = state.get_role(&member_id).clone();
1834                member_role.membership.map.insert(role_id, grantor_id);
1835                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1836
1837                CatalogState::add_to_audit_log(
1838                    &state.system_configuration,
1839                    oracle_write_ts,
1840                    session,
1841                    tx,
1842                    audit_events,
1843                    EventType::Grant,
1844                    ObjectType::Role,
1845                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1846                        role_id: role_id.to_string(),
1847                        member_id: member_id.to_string(),
1848                        grantor_id: grantor_id.to_string(),
1849                        executed_by: session
1850                            .map(|session| session.authenticated_role_id())
1851                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1852                            .to_string(),
1853                    }),
1854                )?;
1855            }
1856            Op::RevokeRole {
1857                role_id,
1858                member_id,
1859                grantor_id,
1860            } => {
1861                state.ensure_not_reserved_role(&member_id)?;
1862                state.ensure_grantable_role(&role_id)?;
1863                let mut member_role = state.get_role(&member_id).clone();
1864                member_role.membership.map.remove(&role_id);
1865                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1866
1867                CatalogState::add_to_audit_log(
1868                    &state.system_configuration,
1869                    oracle_write_ts,
1870                    session,
1871                    tx,
1872                    audit_events,
1873                    EventType::Revoke,
1874                    ObjectType::Role,
1875                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1876                        role_id: role_id.to_string(),
1877                        member_id: member_id.to_string(),
1878                        grantor_id: grantor_id.to_string(),
1879                        executed_by: session
1880                            .map(|session| session.authenticated_role_id())
1881                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1882                            .to_string(),
1883                    }),
1884                )?;
1885            }
1886            Op::UpdatePrivilege {
1887                target_id,
1888                privilege,
1889                variant,
1890            } => {
1891                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1892                    UpdatePrivilegeVariant::Grant => {
1893                        privileges.grant(privilege);
1894                    }
1895                    UpdatePrivilegeVariant::Revoke => {
1896                        privileges.revoke(&privilege);
1897                    }
1898                };
1899                match &target_id {
1900                    SystemObjectId::Object(object_id) => match object_id {
1901                        ObjectId::Cluster(id) => {
1902                            let mut cluster = state.get_cluster(*id).clone();
1903                            update_privilege_fn(&mut cluster.privileges);
1904                            tx.update_cluster(*id, cluster.into())?;
1905                        }
1906                        ObjectId::Database(id) => {
1907                            let mut database = state.get_database(id).clone();
1908                            update_privilege_fn(&mut database.privileges);
1909                            tx.update_database(*id, database.into())?;
1910                        }
1911                        ObjectId::NetworkPolicy(id) => {
1912                            let mut policy = state.get_network_policy(id).clone();
1913                            update_privilege_fn(&mut policy.privileges);
1914                            tx.update_network_policy(*id, policy.into())?;
1915                        }
1916                        ObjectId::Schema((database_spec, schema_spec)) => {
1917                            let schema_id = schema_spec.clone().into();
1918                            let mut schema = state
1919                                .get_schema(
1920                                    database_spec,
1921                                    schema_spec,
1922                                    session
1923                                        .map(|session| session.conn_id())
1924                                        .unwrap_or(&SYSTEM_CONN_ID),
1925                                )
1926                                .clone();
1927                            update_privilege_fn(&mut schema.privileges);
1928                            tx.update_schema(schema_id, schema.into())?;
1929                        }
1930                        ObjectId::Item(id) => {
1931                            let entry = state.get_entry(id);
1932                            let mut new_entry = entry.clone();
1933                            update_privilege_fn(&mut new_entry.privileges);
1934                            if !new_entry.item().is_temporary() {
1935                                tx.update_item(*id, new_entry.into())?;
1936                            } else {
1937                                temporary_item_updates
1938                                    .push((entry.clone().into(), StateDiff::Retraction));
1939                                temporary_item_updates
1940                                    .push((new_entry.into(), StateDiff::Addition));
1941                            }
1942                        }
1943                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1944                    },
1945                    SystemObjectId::System => {
1946                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
1947                        update_privilege_fn(&mut system_privileges);
1948                        let new_privilege =
1949                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1950                        tx.set_system_privilege(
1951                            privilege.grantee,
1952                            privilege.grantor,
1953                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1954                        )?;
1955                    }
1956                }
1957                let object_type = state.get_system_object_type(&target_id);
1958                let object_id_str = match &target_id {
1959                    SystemObjectId::System => "SYSTEM".to_string(),
1960                    SystemObjectId::Object(id) => id.to_string(),
1961                };
1962                CatalogState::add_to_audit_log(
1963                    &state.system_configuration,
1964                    oracle_write_ts,
1965                    session,
1966                    tx,
1967                    audit_events,
1968                    variant.into(),
1969                    system_object_type_to_audit_object_type(&object_type),
1970                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1971                        object_id: object_id_str,
1972                        grantee_id: privilege.grantee.to_string(),
1973                        grantor_id: privilege.grantor.to_string(),
1974                        privileges: privilege.acl_mode.to_string(),
1975                    }),
1976                )?;
1977            }
1978            Op::UpdateDefaultPrivilege {
1979                privilege_object,
1980                privilege_acl_item,
1981                variant,
1982            } => {
1983                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
1984                match variant {
1985                    UpdatePrivilegeVariant::Grant => default_privileges
1986                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1987                    UpdatePrivilegeVariant::Revoke => {
1988                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1989                    }
1990                }
1991                let new_acl_mode = default_privileges
1992                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1993                tx.set_default_privilege(
1994                    privilege_object.role_id,
1995                    privilege_object.database_id,
1996                    privilege_object.schema_id,
1997                    privilege_object.object_type,
1998                    privilege_acl_item.grantee,
1999                    new_acl_mode.cloned(),
2000                )?;
2001                CatalogState::add_to_audit_log(
2002                    &state.system_configuration,
2003                    oracle_write_ts,
2004                    session,
2005                    tx,
2006                    audit_events,
2007                    variant.into(),
2008                    object_type_to_audit_object_type(privilege_object.object_type),
2009                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2010                        role_id: privilege_object.role_id.to_string(),
2011                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2012                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2013                        grantee_id: privilege_acl_item.grantee.to_string(),
2014                        privileges: privilege_acl_item.acl_mode.to_string(),
2015                    }),
2016                )?;
2017            }
2018            Op::RenameCluster {
2019                id,
2020                name,
2021                to_name,
2022                check_reserved_names,
2023            } => {
2024                if id.is_system() {
2025                    return Err(AdapterError::Catalog(Error::new(
2026                        ErrorKind::ReadOnlyCluster(name.clone()),
2027                    )));
2028                }
2029                if check_reserved_names && is_reserved_name(&to_name) {
2030                    return Err(AdapterError::Catalog(Error::new(
2031                        ErrorKind::ReservedClusterName(to_name),
2032                    )));
2033                }
2034                tx.rename_cluster(id, &name, &to_name)?;
2035                CatalogState::add_to_audit_log(
2036                    &state.system_configuration,
2037                    oracle_write_ts,
2038                    session,
2039                    tx,
2040                    audit_events,
2041                    EventType::Alter,
2042                    ObjectType::Cluster,
2043                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2044                        id: id.to_string(),
2045                        old_name: name.clone(),
2046                        new_name: to_name.clone(),
2047                    }),
2048                )?;
2049                info!("rename cluster {name} to {to_name}");
2050            }
2051            Op::RenameClusterReplica {
2052                cluster_id,
2053                replica_id,
2054                name,
2055                to_name,
2056            } => {
2057                if is_reserved_name(&to_name) {
2058                    return Err(AdapterError::Catalog(Error::new(
2059                        ErrorKind::ReservedReplicaName(to_name),
2060                    )));
2061                }
2062                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2063                CatalogState::add_to_audit_log(
2064                    &state.system_configuration,
2065                    oracle_write_ts,
2066                    session,
2067                    tx,
2068                    audit_events,
2069                    EventType::Alter,
2070                    ObjectType::ClusterReplica,
2071                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2072                        cluster_id: cluster_id.to_string(),
2073                        replica_id: replica_id.to_string(),
2074                        old_name: name.replica.as_str().to_string(),
2075                        new_name: to_name.clone(),
2076                    }),
2077                )?;
2078                info!("rename cluster replica {name} to {to_name}");
2079            }
2080            Op::RenameItem {
2081                id,
2082                to_name,
2083                current_full_name,
2084            } => {
2085                let mut updates = Vec::new();
2086
2087                let entry = state.get_entry(&id);
2088                if let CatalogItem::Type(_) = entry.item() {
2089                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2090                        current_full_name.to_string(),
2091                    ))));
2092                }
2093
2094                if entry.id().is_system() {
2095                    let name = state
2096                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2097                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2098                        name.to_string(),
2099                    ))));
2100                }
2101
2102                let mut to_full_name = current_full_name.clone();
2103                to_full_name.item.clone_from(&to_name);
2104
2105                let mut to_qualified_name = entry.name().clone();
2106                to_qualified_name.item.clone_from(&to_name);
2107
2108                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2109                    id: id.to_string(),
2110                    old_name: Self::full_name_detail(&current_full_name),
2111                    new_name: Self::full_name_detail(&to_full_name),
2112                });
2113                if Self::should_audit_log_item(entry.item()) {
2114                    CatalogState::add_to_audit_log(
2115                        &state.system_configuration,
2116                        oracle_write_ts,
2117                        session,
2118                        tx,
2119                        audit_events,
2120                        EventType::Alter,
2121                        catalog_type_to_audit_object_type(entry.item().typ()),
2122                        details,
2123                    )?;
2124                }
2125
2126                // Rename item itself.
2127                let mut new_entry = entry.clone();
2128                new_entry.name.item.clone_from(&to_name);
2129                new_entry.item = entry
2130                    .item()
2131                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2132                    .map_err(|e| {
2133                        Error::new(ErrorKind::from(AmbiguousRename {
2134                            depender: state
2135                                .resolve_full_name(entry.name(), entry.conn_id())
2136                                .to_string(),
2137                            dependee: state
2138                                .resolve_full_name(entry.name(), entry.conn_id())
2139                                .to_string(),
2140                            message: e,
2141                        }))
2142                    })?;
2143
2144                for id in entry.referenced_by() {
2145                    let dependent_item = state.get_entry(id);
2146                    let mut to_entry = dependent_item.clone();
2147                    to_entry.item = dependent_item
2148                        .item()
2149                        .rename_item_refs(
2150                            current_full_name.clone(),
2151                            to_full_name.item.clone(),
2152                            false,
2153                        )
2154                        .map_err(|e| {
2155                            Error::new(ErrorKind::from(AmbiguousRename {
2156                                depender: state
2157                                    .resolve_full_name(
2158                                        dependent_item.name(),
2159                                        dependent_item.conn_id(),
2160                                    )
2161                                    .to_string(),
2162                                dependee: state
2163                                    .resolve_full_name(entry.name(), entry.conn_id())
2164                                    .to_string(),
2165                                message: e,
2166                            }))
2167                        })?;
2168
2169                    if !to_entry.item().is_temporary() {
2170                        tx.update_item(*id, to_entry.into())?;
2171                    } else {
2172                        temporary_item_updates
2173                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2174                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2175                    }
2176                    updates.push(*id);
2177                }
2178                if !new_entry.item().is_temporary() {
2179                    tx.update_item(id, new_entry.into())?;
2180                } else {
2181                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2182                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2183                }
2184
2185                updates.push(id);
2186                for id in updates {
2187                    Self::log_update(state, &id);
2188                }
2189            }
2190            Op::RenameSchema {
2191                database_spec,
2192                schema_spec,
2193                new_name,
2194                check_reserved_names,
2195            } => {
2196                if check_reserved_names && is_reserved_name(&new_name) {
2197                    return Err(AdapterError::Catalog(Error::new(
2198                        ErrorKind::ReservedSchemaName(new_name),
2199                    )));
2200                }
2201
2202                let conn_id = session
2203                    .map(|session| session.conn_id())
2204                    .unwrap_or(&SYSTEM_CONN_ID);
2205
2206                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2207                let cur_name = schema.name().schema.clone();
2208
2209                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2210                    return Err(AdapterError::Catalog(Error::new(
2211                        ErrorKind::AmbientSchemaRename(cur_name),
2212                    )));
2213                };
2214                let database = state.get_database(&database_id);
2215                let database_name = &database.name;
2216
2217                let mut updates = Vec::new();
2218                let mut items_to_update = BTreeMap::new();
2219
2220                let mut update_item = |id| {
2221                    if items_to_update.contains_key(id) {
2222                        return Ok(());
2223                    }
2224
2225                    let entry = state.get_entry(id);
2226
2227                    // Update our item.
2228                    let mut new_entry = entry.clone();
2229                    new_entry.item = entry
2230                        .item
2231                        .rename_schema_refs(database_name, &cur_name, &new_name)
2232                        .map_err(|(s, _i)| {
2233                            Error::new(ErrorKind::from(AmbiguousRename {
2234                                depender: state
2235                                    .resolve_full_name(entry.name(), entry.conn_id())
2236                                    .to_string(),
2237                                dependee: format!("{database_name}.{cur_name}"),
2238                                message: format!("ambiguous reference to schema named {s}"),
2239                            }))
2240                        })?;
2241
2242                    // Queue updates for Catalog storage and Builtin Tables.
2243                    if !new_entry.item().is_temporary() {
2244                        items_to_update.insert(*id, new_entry.into());
2245                    } else {
2246                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2247                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2248                    }
2249                    updates.push(id);
2250
2251                    Ok::<_, AdapterError>(())
2252                };
2253
2254                // Update all of the items in the schema.
2255                for (_name, item_id) in &schema.items {
2256                    // Update the item itself.
2257                    update_item(item_id)?;
2258
2259                    // Update everything that depends on this item.
2260                    for id in state.get_entry(item_id).referenced_by() {
2261                        update_item(id)?;
2262                    }
2263                }
2264                // Note: When updating the transaction it's very important that we update the
2265                // items as a whole group, otherwise we exhibit quadratic behavior.
2266                tx.update_items(items_to_update)?;
2267
2268                // Renaming temporary schemas is not supported.
2269                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2270                    let schema_name = schema.name().schema.clone();
2271                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2272                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2273                    )));
2274                };
2275
2276                // Add an entry to the audit log.
2277                let database_name = database_spec
2278                    .id()
2279                    .map(|id| state.get_database(&id).name.clone());
2280                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2281                    id: schema_id.to_string(),
2282                    old_name: schema.name().schema.clone(),
2283                    new_name: new_name.clone(),
2284                    database_name,
2285                });
2286                CatalogState::add_to_audit_log(
2287                    &state.system_configuration,
2288                    oracle_write_ts,
2289                    session,
2290                    tx,
2291                    audit_events,
2292                    EventType::Alter,
2293                    mz_audit_log::ObjectType::Schema,
2294                    details,
2295                )?;
2296
2297                // Update the schema itself.
2298                let mut new_schema = schema.clone();
2299                new_schema.name.schema.clone_from(&new_name);
2300                tx.update_schema(schema_id, new_schema.into())?;
2301
2302                for id in updates {
2303                    Self::log_update(state, id);
2304                }
2305            }
2306            Op::UpdateOwner { id, new_owner } => {
2307                let conn_id = session
2308                    .map(|session| session.conn_id())
2309                    .unwrap_or(&SYSTEM_CONN_ID);
2310                let old_owner = state
2311                    .get_owner_id(&id, conn_id)
2312                    .expect("cannot update the owner of an object without an owner");
2313                match &id {
2314                    ObjectId::Cluster(id) => {
2315                        let mut cluster = state.get_cluster(*id).clone();
2316                        if id.is_system() {
2317                            return Err(AdapterError::Catalog(Error::new(
2318                                ErrorKind::ReadOnlyCluster(cluster.name),
2319                            )));
2320                        }
2321                        Self::update_privilege_owners(
2322                            &mut cluster.privileges,
2323                            cluster.owner_id,
2324                            new_owner,
2325                        );
2326                        cluster.owner_id = new_owner;
2327                        tx.update_cluster(*id, cluster.into())?;
2328                    }
2329                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2330                        let cluster = state.get_cluster(*cluster_id);
2331                        let mut replica = cluster
2332                            .replica(*replica_id)
2333                            .expect("catalog out of sync")
2334                            .clone();
2335                        if replica_id.is_system() {
2336                            return Err(AdapterError::Catalog(Error::new(
2337                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2338                            )));
2339                        }
2340                        replica.owner_id = new_owner;
2341                        tx.update_cluster_replica(*replica_id, replica.into())?;
2342                    }
2343                    ObjectId::Database(id) => {
2344                        let mut database = state.get_database(id).clone();
2345                        if id.is_system() {
2346                            return Err(AdapterError::Catalog(Error::new(
2347                                ErrorKind::ReadOnlyDatabase(database.name),
2348                            )));
2349                        }
2350                        Self::update_privilege_owners(
2351                            &mut database.privileges,
2352                            database.owner_id,
2353                            new_owner,
2354                        );
2355                        database.owner_id = new_owner;
2356                        tx.update_database(*id, database.clone().into())?;
2357                    }
2358                    ObjectId::Schema((database_spec, schema_spec)) => {
2359                        let schema_id: SchemaId = schema_spec.clone().into();
2360                        let mut schema = state
2361                            .get_schema(database_spec, schema_spec, conn_id)
2362                            .clone();
2363                        if schema_id.is_system() {
2364                            let name = schema.name();
2365                            let full_name = state.resolve_full_schema_name(name);
2366                            return Err(AdapterError::Catalog(Error::new(
2367                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2368                            )));
2369                        }
2370                        Self::update_privilege_owners(
2371                            &mut schema.privileges,
2372                            schema.owner_id,
2373                            new_owner,
2374                        );
2375                        schema.owner_id = new_owner;
2376                        tx.update_schema(schema_id, schema.into())?;
2377                    }
2378                    ObjectId::Item(id) => {
2379                        let entry = state.get_entry(id);
2380                        let mut new_entry = entry.clone();
2381                        if id.is_system() {
2382                            let full_name = state.resolve_full_name(
2383                                new_entry.name(),
2384                                session.map(|session| session.conn_id()),
2385                            );
2386                            return Err(AdapterError::Catalog(Error::new(
2387                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2388                            )));
2389                        }
2390                        Self::update_privilege_owners(
2391                            &mut new_entry.privileges,
2392                            new_entry.owner_id,
2393                            new_owner,
2394                        );
2395                        new_entry.owner_id = new_owner;
2396                        if !new_entry.item().is_temporary() {
2397                            tx.update_item(*id, new_entry.into())?;
2398                        } else {
2399                            temporary_item_updates
2400                                .push((entry.clone().into(), StateDiff::Retraction));
2401                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2402                        }
2403                    }
2404                    ObjectId::NetworkPolicy(id) => {
2405                        let mut policy = state.get_network_policy(id).clone();
2406                        if id.is_system() {
2407                            return Err(AdapterError::Catalog(Error::new(
2408                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2409                            )));
2410                        }
2411                        Self::update_privilege_owners(
2412                            &mut policy.privileges,
2413                            policy.owner_id,
2414                            new_owner,
2415                        );
2416                        policy.owner_id = new_owner;
2417                        tx.update_network_policy(*id, policy.into())?;
2418                    }
2419                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2420                }
2421                let object_type = state.get_object_type(&id);
2422                CatalogState::add_to_audit_log(
2423                    &state.system_configuration,
2424                    oracle_write_ts,
2425                    session,
2426                    tx,
2427                    audit_events,
2428                    EventType::Alter,
2429                    object_type_to_audit_object_type(object_type),
2430                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2431                        object_id: id.to_string(),
2432                        old_owner_id: old_owner.to_string(),
2433                        new_owner_id: new_owner.to_string(),
2434                    }),
2435                )?;
2436            }
2437            Op::UpdateClusterConfig { id, name, config } => {
2438                let mut cluster = state.get_cluster(id).clone();
2439                cluster.config = config;
2440                tx.update_cluster(id, cluster.into())?;
2441                info!("update cluster {}", name);
2442
2443                CatalogState::add_to_audit_log(
2444                    &state.system_configuration,
2445                    oracle_write_ts,
2446                    session,
2447                    tx,
2448                    audit_events,
2449                    EventType::Alter,
2450                    ObjectType::Cluster,
2451                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2452                        id: id.to_string(),
2453                        name,
2454                    }),
2455                )?;
2456            }
2457            Op::UpdateClusterReplicaConfig {
2458                replica_id,
2459                cluster_id,
2460                config,
2461            } => {
2462                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2463                info!("update replica {}", replica.name);
2464                tx.update_cluster_replica(
2465                    replica_id,
2466                    mz_catalog::durable::ClusterReplica {
2467                        cluster_id,
2468                        replica_id,
2469                        name: replica.name.clone(),
2470                        config: config.clone().into(),
2471                        owner_id: replica.owner_id,
2472                    },
2473                )?;
2474            }
2475            Op::UpdateItem { id, name, to_item } => {
2476                let mut entry = state.get_entry(&id).clone();
2477                entry.name = name.clone();
2478                entry.item = to_item.clone();
2479                tx.update_item(id, entry.into())?;
2480
2481                if Self::should_audit_log_item(&to_item) {
2482                    let mut full_name = Self::full_name_detail(
2483                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2484                    );
2485                    full_name.item = name.item;
2486
2487                    CatalogState::add_to_audit_log(
2488                        &state.system_configuration,
2489                        oracle_write_ts,
2490                        session,
2491                        tx,
2492                        audit_events,
2493                        EventType::Alter,
2494                        catalog_type_to_audit_object_type(to_item.typ()),
2495                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2496                            id: id.to_string(),
2497                            name: full_name,
2498                        }),
2499                    )?;
2500                }
2501
2502                Self::log_update(state, &id);
2503            }
2504            Op::UpdateSystemConfiguration { name, value } => {
2505                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2506                tx.upsert_system_config(&name, parsed_value.clone())?;
2507                // This mirrors some "system vars" into the catalog storage
2508                // "config" collection so that we can toggle the flag with
2509                // Launch Darkly, but use it in boot before Launch Darkly is
2510                // available.
2511                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2512                    let with_0dt_deployment_max_wait =
2513                        Duration::parse(VarInput::Flat(&parsed_value))
2514                            .expect("parsing succeeded above");
2515                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2516                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2517                    let with_0dt_deployment_ddl_check_interval =
2518                        Duration::parse(VarInput::Flat(&parsed_value))
2519                            .expect("parsing succeeded above");
2520                    tx.set_0dt_deployment_ddl_check_interval(
2521                        with_0dt_deployment_ddl_check_interval,
2522                    )?;
2523                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2524                    let panic_after_timeout =
2525                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2526                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2527                }
2528
2529                CatalogState::add_to_audit_log(
2530                    &state.system_configuration,
2531                    oracle_write_ts,
2532                    session,
2533                    tx,
2534                    audit_events,
2535                    EventType::Alter,
2536                    ObjectType::System,
2537                    EventDetails::SetV1(mz_audit_log::SetV1 {
2538                        name,
2539                        value: Some(value.borrow().to_vec().join(", ")),
2540                    }),
2541                )?;
2542            }
2543            Op::ResetSystemConfiguration { name } => {
2544                tx.remove_system_config(&name);
2545                // This mirrors some "system vars" into the catalog storage
2546                // "config" collection so that we can toggle the flag with
2547                // Launch Darkly, but use it in boot before Launch Darkly is
2548                // available.
2549                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2550                    tx.reset_0dt_deployment_max_wait()?;
2551                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2552                    tx.reset_0dt_deployment_ddl_check_interval()?;
2553                }
2554
2555                CatalogState::add_to_audit_log(
2556                    &state.system_configuration,
2557                    oracle_write_ts,
2558                    session,
2559                    tx,
2560                    audit_events,
2561                    EventType::Alter,
2562                    ObjectType::System,
2563                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2564                )?;
2565            }
2566            Op::ResetAllSystemConfiguration => {
2567                tx.clear_system_configs();
2568                tx.reset_0dt_deployment_max_wait()?;
2569                tx.reset_0dt_deployment_ddl_check_interval()?;
2570
2571                CatalogState::add_to_audit_log(
2572                    &state.system_configuration,
2573                    oracle_write_ts,
2574                    session,
2575                    tx,
2576                    audit_events,
2577                    EventType::Alter,
2578                    ObjectType::System,
2579                    EventDetails::ResetAllV1,
2580                )?;
2581            }
2582            Op::WeirdStorageUsageUpdates {
2583                object_id,
2584                size_bytes,
2585                collection_timestamp,
2586            } => {
2587                let id = tx.allocate_storage_usage_ids()?;
2588                let metric =
2589                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2590                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2591                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2592                weird_builtin_table_update = Some(builtin_table_update);
2593            }
2594        };
2595        Ok((weird_builtin_table_update, temporary_item_updates))
2596    }
2597
2598    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2599        let entry = state.get_entry(id);
2600        info!(
2601            "update {} {} ({})",
2602            entry.item_type(),
2603            state.resolve_full_name(entry.name(), entry.conn_id()),
2604            id
2605        );
2606    }
2607
2608    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2609    /// implementation:
2610    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2611    fn update_privilege_owners(
2612        privileges: &mut PrivilegeMap,
2613        old_owner: RoleId,
2614        new_owner: RoleId,
2615    ) {
2616        // TODO(jkosh44) Would be nice not to clone every privilege.
2617        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2618
2619        let mut new_present = false;
2620        for privilege in flat_privileges.iter_mut() {
2621            // Old owner's granted privilege are updated to be granted by the new
2622            // owner.
2623            if privilege.grantor == old_owner {
2624                privilege.grantor = new_owner;
2625            } else if privilege.grantor == new_owner {
2626                new_present = true;
2627            }
2628            // Old owner's privileges is given to the new owner.
2629            if privilege.grantee == old_owner {
2630                privilege.grantee = new_owner;
2631            } else if privilege.grantee == new_owner {
2632                new_present = true;
2633            }
2634        }
2635
2636        // If the old privilege list contained references to the new owner, we may
2637        // have created duplicate entries. Here we try and consolidate them. This
2638        // is inspired by PostgreSQL's algorithm but not identical.
2639        if new_present {
2640            // Group privileges by (grantee, grantor).
2641            let privilege_map: BTreeMap<_, Vec<_>> =
2642                flat_privileges
2643                    .into_iter()
2644                    .fold(BTreeMap::new(), |mut accum, privilege| {
2645                        accum
2646                            .entry((privilege.grantee, privilege.grantor))
2647                            .or_default()
2648                            .push(privilege);
2649                        accum
2650                    });
2651
2652            // Consolidate and update all privileges.
2653            flat_privileges = privilege_map
2654                .into_iter()
2655                .map(|((grantee, grantor), values)|
2656                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2657                    values.into_iter().fold(
2658                        MzAclItem::empty(grantee, grantor),
2659                        |mut accum, mz_aclitem| {
2660                            accum.acl_mode =
2661                                accum.acl_mode.union(mz_aclitem.acl_mode);
2662                            accum
2663                        },
2664                    ))
2665                .collect();
2666        }
2667
2668        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2669    }
2670}
2671
2672/// Prepare the given transaction for replacing a catalog item with a new version.
2673///
2674/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2675/// dependent objects to refer to that new ID (at a previous version).
2676///
2677/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2678/// for catalog items. We currently think that there are no use cases that require this assumption,
2679/// but no way to know for sure.
2680fn tx_replace_item(
2681    tx: &mut Transaction<'_>,
2682    state: &CatalogState,
2683    id: CatalogItemId,
2684    new_entry: CatalogEntry,
2685) -> Result<(), AdapterError> {
2686    let new_id = new_entry.id;
2687
2688    // Rewrite dependent objects to point to the new ID.
2689    for use_id in new_entry.referenced_by() {
2690        // The dependent might be dropped in the same tx, so check.
2691        if tx.get_item(use_id).is_none() {
2692            continue;
2693        }
2694
2695        let mut dependent = state.get_entry(use_id).clone();
2696        dependent.item = dependent.item.replace_item_refs(id, new_id);
2697        tx.update_item(*use_id, dependent.into())?;
2698    }
2699
2700    // Move comments to the new ID.
2701    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2702    let new_comment_id = new_entry.comment_object_id();
2703    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2704        tx.drop_comments(&[old_comment_id].into())?;
2705        for (sub, comment) in comments {
2706            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2707        }
2708    }
2709
2710    let mz_catalog::durable::Item {
2711        id: _,
2712        oid,
2713        global_id,
2714        schema_id,
2715        name,
2716        create_sql,
2717        owner_id,
2718        privileges,
2719        extra_versions,
2720    } = new_entry.into();
2721
2722    tx.remove_item(id)?;
2723    tx.insert_item(
2724        new_id,
2725        oid,
2726        global_id,
2727        schema_id,
2728        &name,
2729        create_sql,
2730        owner_id,
2731        privileges,
2732        extra_versions,
2733    )?;
2734
2735    Ok(())
2736}
2737
2738/// Generate audit events for a replacement apply operation.
2739fn apply_replacement_audit_events(
2740    state: &CatalogState,
2741    target: &CatalogEntry,
2742    replacement: &CatalogEntry,
2743) -> Vec<(EventType, EventDetails)> {
2744    let mut events = Vec::new();
2745
2746    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2747    let target_id_name = IdFullNameV1 {
2748        id: target.id().to_string(),
2749        name: Catalog::full_name_detail(&target_name),
2750    };
2751    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2752    let replacement_id_name = IdFullNameV1 {
2753        id: replacement.id().to_string(),
2754        name: Catalog::full_name_detail(&replacement_name),
2755    };
2756
2757    if Catalog::should_audit_log_item(&replacement.item) {
2758        events.push((
2759            EventType::Drop,
2760            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2761        ));
2762    }
2763
2764    if Catalog::should_audit_log_item(&target.item) {
2765        events.push((
2766            EventType::Alter,
2767            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2768                target: target_id_name.clone(),
2769                replacement: replacement_id_name,
2770            }),
2771        ));
2772
2773        if let Some(old_cluster_id) = target.cluster_id()
2774            && let Some(new_cluster_id) = replacement.cluster_id()
2775            && old_cluster_id != new_cluster_id
2776        {
2777            // When the replacement is applied, the target takes on the ID of the replacement, so
2778            // we should use that ID for subsequent events.
2779            events.push((
2780                EventType::Alter,
2781                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2782                    id: replacement.id().to_string(),
2783                    name: target_id_name.name,
2784                    old_cluster_id: old_cluster_id.to_string(),
2785                    new_cluster_id: new_cluster_id.to_string(),
2786                }),
2787            ));
2788        }
2789    }
2790
2791    events
2792}
2793
2794/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2795///
2796/// Note: Previously we used to omit a single `Op::DropObject` for every object
2797/// we needed to drop. But removing a batch of objects from a durable Catalog
2798/// Transaction is O(n) where `n` is the number of objects that exist in the
2799/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2800/// `DROP ... CASCADE` statement.
2801#[derive(Debug, Default)]
2802pub(crate) struct ObjectsToDrop {
2803    pub comments: BTreeSet<CommentObjectId>,
2804    pub databases: BTreeSet<DatabaseId>,
2805    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2806    pub clusters: BTreeSet<ClusterId>,
2807    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2808    pub roles: BTreeSet<RoleId>,
2809    pub items: Vec<CatalogItemId>,
2810    pub network_policies: BTreeSet<NetworkPolicyId>,
2811}
2812
2813impl ObjectsToDrop {
2814    pub fn generate(
2815        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2816        state: &CatalogState,
2817        session: Option<&ConnMeta>,
2818    ) -> Result<Self, AdapterError> {
2819        let mut delta = ObjectsToDrop::default();
2820
2821        for drop_object_info in drop_object_infos {
2822            delta.add_item(drop_object_info, state, session)?;
2823        }
2824
2825        Ok(delta)
2826    }
2827
2828    fn add_item(
2829        &mut self,
2830        drop_object_info: DropObjectInfo,
2831        state: &CatalogState,
2832        session: Option<&ConnMeta>,
2833    ) -> Result<(), AdapterError> {
2834        self.comments
2835            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2836
2837        match drop_object_info {
2838            DropObjectInfo::Database(database_id) => {
2839                let database = &state.database_by_id[&database_id];
2840                if database_id.is_system() {
2841                    return Err(AdapterError::Catalog(Error::new(
2842                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2843                    )));
2844                }
2845
2846                self.databases.insert(database_id);
2847            }
2848            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2849                let schema = state.get_schema(
2850                    &database_spec,
2851                    &schema_spec,
2852                    session
2853                        .map(|session| session.conn_id())
2854                        .unwrap_or(&SYSTEM_CONN_ID),
2855                );
2856                let schema_id: SchemaId = schema_spec.into();
2857                if schema_id.is_system() {
2858                    let name = schema.name();
2859                    let full_name = state.resolve_full_schema_name(name);
2860                    return Err(AdapterError::Catalog(Error::new(
2861                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2862                    )));
2863                }
2864
2865                self.schemas.insert(schema_spec, database_spec);
2866            }
2867            DropObjectInfo::Role(role_id) => {
2868                let name = state.get_role(&role_id).name().to_string();
2869                if role_id.is_system() || role_id.is_predefined() {
2870                    return Err(AdapterError::Catalog(Error::new(
2871                        ErrorKind::ReservedRoleName(name.clone()),
2872                    )));
2873                }
2874                state.ensure_not_reserved_role(&role_id)?;
2875
2876                self.roles.insert(role_id);
2877            }
2878            DropObjectInfo::Cluster(cluster_id) => {
2879                let cluster = state.get_cluster(cluster_id);
2880                let name = &cluster.name;
2881                if cluster_id.is_system() {
2882                    return Err(AdapterError::Catalog(Error::new(
2883                        ErrorKind::ReadOnlyCluster(name.clone()),
2884                    )));
2885                }
2886
2887                self.clusters.insert(cluster_id);
2888            }
2889            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2890                let cluster = state.get_cluster(cluster_id);
2891                let replica = cluster.replica(replica_id).expect("Must exist");
2892
2893                self.replicas
2894                    .insert(replica.replica_id, (cluster.id, reason));
2895
2896                // Implicitly drop materialized views that target this replica.
2897                // When the target replica is gone, no replica advances the
2898                // persist shard's upper frontier, causing reads to hang.
2899                for (_id, entry) in state.get_entries() {
2900                    if let CatalogItem::MaterializedView(mv) = entry.item() {
2901                        if mv.target_replica == Some(replica_id)
2902                            && !self.items.contains(&entry.id())
2903                        {
2904                            tracing::warn!(
2905                                "implicitly dropping materialized view {} because target replica was dropped",
2906                                entry.name().item,
2907                            );
2908                            self.items.push(entry.id());
2909                        }
2910                    }
2911                }
2912            }
2913            DropObjectInfo::Item(item_id) => {
2914                let entry = state.get_entry(&item_id);
2915                if item_id.is_system() {
2916                    let name = entry.name();
2917                    let full_name =
2918                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2919                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2920                        full_name.to_string(),
2921                    ))));
2922                }
2923
2924                self.items.push(item_id);
2925            }
2926            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2927                let policy = state.get_network_policy(&network_policy_id);
2928                let name = &policy.name;
2929                if network_policy_id.is_system() {
2930                    return Err(AdapterError::Catalog(Error::new(
2931                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2932                    )));
2933                }
2934
2935                self.network_policies.insert(network_policy_id);
2936            }
2937        }
2938
2939        Ok(())
2940    }
2941}
2942
2943#[cfg(test)]
2944mod tests {
2945    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2946    use mz_repr::role_id::RoleId;
2947
2948    use crate::catalog::Catalog;
2949
2950    #[mz_ore::test]
2951    fn test_update_privilege_owners() {
2952        let old_owner = RoleId::User(1);
2953        let new_owner = RoleId::User(2);
2954        let other_role = RoleId::User(3);
2955
2956        // older owner exists as grantor.
2957        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2958            MzAclItem {
2959                grantee: other_role,
2960                grantor: old_owner,
2961                acl_mode: AclMode::UPDATE,
2962            },
2963            MzAclItem {
2964                grantee: other_role,
2965                grantor: new_owner,
2966                acl_mode: AclMode::SELECT,
2967            },
2968        ]);
2969        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2970        assert_eq!(1, privileges.all_values().count());
2971        assert_eq!(
2972            vec![MzAclItem {
2973                grantee: other_role,
2974                grantor: new_owner,
2975                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2976            }],
2977            privileges.all_values_owned().collect::<Vec<_>>()
2978        );
2979
2980        // older owner exists as grantee.
2981        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2982            MzAclItem {
2983                grantee: old_owner,
2984                grantor: other_role,
2985                acl_mode: AclMode::UPDATE,
2986            },
2987            MzAclItem {
2988                grantee: new_owner,
2989                grantor: other_role,
2990                acl_mode: AclMode::SELECT,
2991            },
2992        ]);
2993        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2994        assert_eq!(1, privileges.all_values().count());
2995        assert_eq!(
2996            vec![MzAclItem {
2997                grantee: new_owner,
2998                grantor: other_role,
2999                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3000            }],
3001            privileges.all_values_owned().collect::<Vec<_>>()
3002        );
3003
3004        // older owner exists as grantee and grantor.
3005        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3006            MzAclItem {
3007                grantee: old_owner,
3008                grantor: old_owner,
3009                acl_mode: AclMode::UPDATE,
3010            },
3011            MzAclItem {
3012                grantee: new_owner,
3013                grantor: new_owner,
3014                acl_mode: AclMode::SELECT,
3015            },
3016        ]);
3017        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3018        assert_eq!(1, privileges.all_values().count());
3019        assert_eq!(
3020            vec![MzAclItem {
3021                grantee: new_owner,
3022                grantor: new_owner,
3023                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3024            }],
3025            privileges.all_values_owned().collect::<Vec<_>>()
3026        );
3027    }
3028}